From 3169213484dec445b506930aae8abc219496685c Mon Sep 17 00:00:00 2001 From: Buchi Reddy Busi Reddy Date: Mon, 14 Dec 2020 18:20:57 -0800 Subject: [PATCH 1/4] Adding a new API to bulkUpsertAndGet the documents in the doc store. This will help the clients to build more interesting use cases and also cutdown a roundtrip to the server in the cases where they need to immediately get back the upserted documents. --- .../mongo/MongoDocStoreTest.java | 22 +++++ .../postgres/PostgresDocStoreTest.java | 44 +++++++++- .../core/documentstore/Collection.java | 6 ++ .../documentstore/mongo/MongoCollection.java | 62 ++++++++++---- .../postgres/PostgresCollection.java | 84 ++++++++++++++----- 5 files changed, 180 insertions(+), 38 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java index 20ed32e4..37924426 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java @@ -394,6 +394,28 @@ public void testBulkUpsert() { assertEquals(1, collection.count()); } + @Test + public void testBulkUpsertAndReturn() throws IOException { + datastore.createCollection(COLLECTION_NAME, null); + Collection collection = datastore.getCollection(COLLECTION_NAME); + Map documentMap = Map.of( + new SingleValueKey("default", "testKey1"), createDocument("testKey1", "abc1"), + new SingleValueKey("default", "testKey2"), createDocument("testKey2", "abc2") + ); + + Iterator iterator = collection.bulkUpsertAndReturn(documentMap); + assertEquals(2, collection.count()); + List documents = new ArrayList<>(); + while (iterator.hasNext()) { + documents.add(iterator.next()); + } + assertEquals(2, documents.size()); + + // Delete one of the documents and test again. + collection.delete(new SingleValueKey("default", "testKey1")); + assertEquals(1, collection.count()); + } + @Test public void testLike() { MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017"); diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java index 7e8c888b..db859cd3 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java @@ -119,7 +119,7 @@ public void testUpsertAndReturn() throws IOException { } @Test - public void testBulkUpsert() throws IOException { + public void testBulkUpsert() { Collection collection = datastore.getCollection(COLLECTION_NAME); Map bulkMap = new HashMap<>(); bulkMap.put(new SingleValueKey("default", "testKey1"), createDocument("name", "Bob")); @@ -153,6 +153,46 @@ public void testBulkUpsert() throws IOException { } } + @Test + public void testBulkUpsertAndReturn() throws IOException { + Collection collection = datastore.getCollection(COLLECTION_NAME); + Map bulkMap = new HashMap<>(); + bulkMap.put(new SingleValueKey("default", "testKey1"), createDocument("name", "Bob")); + bulkMap.put(new SingleValueKey("default", "testKey2"), createDocument("name", "Alice")); + bulkMap.put(new SingleValueKey("default", "testKey3"), createDocument("name", "Alice")); + bulkMap.put(new SingleValueKey("default", "testKey4"), createDocument("name", "Bob")); + bulkMap.put(new SingleValueKey("default", "testKey5"), createDocument("name", "Alice")); + bulkMap.put( + new SingleValueKey("default", "testKey6"), createDocument("email", "bob@example.com")); + + Iterator iterator = collection.bulkUpsertAndReturn(bulkMap); + List documents = new ArrayList<>(); + while (iterator.hasNext()) { + documents.add(iterator.next()); + } + Assertions.assertEquals(6, documents.size()); + + { + // empty query returns all the documents + Query query = new Query(); + Assertions.assertEquals(6, collection.total(query)); + } + + { + Query query = new Query(); + query.setFilter(Filter.eq("name", "Bob")); + Assertions.assertEquals(2, collection.total(query)); + } + + { + // limit should not affect the total + Query query = new Query(); + query.setFilter(Filter.eq("name", "Bob")); + query.setLimit(1); + Assertions.assertEquals(2, collection.total(query)); + } + } + @Test public void testSubDocumentUpdate() throws IOException { Collection collection = datastore.getCollection(COLLECTION_NAME); @@ -271,7 +311,7 @@ public void testSearch() throws IOException { query.setFilter(new Filter(Filter.Op.EQ, DOCUMENT_ID, key.toString())); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertEquals(documents.size(), 1); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index 85b89cc5..1da5a0c1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -80,6 +80,12 @@ public interface Collection { */ boolean bulkUpsert(Map documents); + /** + * Method to bulkUpsert the given documents and return the latest copies of those documents. + * This helps the clients to avoid an additional round trip. + */ + Iterator bulkUpsertAndReturn(Map documents) throws IOException; + /** * Drops a collections */ diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index 11edb224..0acb1486 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import net.jodah.failsafe.Failsafe; @@ -386,28 +387,56 @@ public long total(Query query) { @Override public boolean bulkUpsert(Map documents) { try { - List> bulkCollection = new ArrayList<>(); - for (Entry entry : documents.entrySet()) { - Key key = entry.getKey(); - // insert or overwrite - bulkCollection.add(new UpdateOneModel<>( - this.selectionCriteriaForKey(key), - prepareUpsert(key, entry.getValue()), - new UpdateOptions().upsert(true))); - } - - BulkWriteResult result = Failsafe.with(bulkWriteRetryPolicy) - .get(() -> collection.bulkWrite(bulkCollection, new BulkWriteOptions().ordered(false))); + BulkWriteResult result = bulkUpsertImpl(documents); LOGGER.debug(result.toString()); - return true; - } catch (IOException | MongoServerException e) { LOGGER.error("Error during bulk upsert for documents:{}", documents, e); return false; } } + private BulkWriteResult bulkUpsertImpl(Map documents) throws JsonProcessingException { + List> bulkCollection = new ArrayList<>(); + for (Entry entry : documents.entrySet()) { + Key key = entry.getKey(); + // insert or overwrite + bulkCollection.add(new UpdateOneModel<>( + this.selectionCriteriaForKey(key), + prepareUpsert(key, entry.getValue()), + new UpdateOptions().upsert(true))); + } + + return Failsafe.with(bulkWriteRetryPolicy) + .get(() -> collection.bulkWrite(bulkCollection, new BulkWriteOptions().ordered(false))); + } + + @Override + public Iterator bulkUpsertAndReturn(Map documents) throws IOException { + try { + BulkWriteResult result = bulkUpsertImpl(documents); + LOGGER.debug(result.toString()); + + FindIterable cursor = collection.find(selectionCriteriaForKeys(documents.keySet())); + final MongoCursor mongoCursor = cursor.cursor(); + return new Iterator<>() { + + @Override + public boolean hasNext() { + return mongoCursor.hasNext(); + } + + @Override + public Document next() { + return MongoCollection.this.dbObjectToDocument(mongoCursor.next()); + } + }; + } catch (JsonProcessingException e) { + LOGGER.error("Error during bulk upsert for documents:{}", documents, e); + throw new IOException("Error during bulk upsert."); + } + } + @Override public void drop() { collection.drop(); @@ -417,6 +446,11 @@ private BasicDBObject selectionCriteriaForKey(Key key) { return new BasicDBObject(ID_KEY, key.toString()); } + private BasicDBObject selectionCriteriaForKeys(Set keys) { + return new BasicDBObject(Map.of(ID_KEY, new BasicDBObject("$in", + keys.stream().map(Key::toString).collect(Collectors.toList())))); + } + private Document dbObjectToDocument(BasicDBObject dbObject) { try { // Hack: Remove the _id field since it's an unrecognized field for Proto layer. diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index a6b7b4f6..265e27ed 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -49,8 +49,8 @@ public class PostgresCollection implements Collection { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String DOC_PATH_SEPARATOR = "\\."; - private Connection client; - private String collectionName; + private final Connection client; + private final String collectionName; public PostgresCollection(Connection client, String collectionName) { this.client = client; @@ -77,11 +77,11 @@ public boolean upsert(Key key, Document document) throws IOException { } } - @Override /** * For Postgres upsertAndReturn functionality is not supported directly. * So using direct upsert and then return document. */ + @Override public Document upsertAndReturn(Key key, Document document) throws IOException { try { boolean upsert = upsert(key, document); @@ -92,7 +92,6 @@ public Document upsertAndReturn(Key key, Document document) throws IOException { } } - @Override /** * Update the sub document based on subDocPath based on longest key match. * @@ -119,6 +118,7 @@ public Document upsertAndReturn(Key key, Document document) throws IOException { * address.street.longitude.degree * */ + @Override public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) { String updateSubDocSQL = String .format("UPDATE %s SET %s=jsonb_set(%s, ?::text[], ?::jsonb) WHERE %s=?", @@ -238,7 +238,7 @@ protected String parseQueryForNonCompositeFilter(Filter filter) { .stream() .map(val -> "'" + val + "'") .collect(Collectors.joining(", ")); - return filterString.append("(" + collect + ")").toString(); + return filterString.append("(").append(collect).append(")").toString(); case CONTAINS: // TODO: Matches condition inside an array of documents case EXISTS: @@ -307,13 +307,11 @@ private String getFieldPrefix(String fieldName) { } private String parseOrderByQuery(List orderBys) { - String orderBySQL = orderBys + return orderBys .stream() .map(orderBy -> getFieldPrefix(orderBy.getField()) + " " + (orderBy.isAsc() ? "ASC" : "DESC")) .filter(str -> !StringUtils.isEmpty(str)) .collect(Collectors.joining(" , ")); - - return orderBySQL; } @Override @@ -412,27 +410,69 @@ public long total(Query query) { @Override public boolean bulkUpsert(Map documents) { try { - PreparedStatement preparedStatement = client - .prepareStatement(getUpsertSQL(), Statement.RETURN_GENERATED_KEYS); - for (Map.Entry entry : documents.entrySet()) { + int[] updateCounts = bulkUpsertImpl(documents); - Key key = entry.getKey(); - String jsonString = prepareUpsertDocument(key, entry.getValue()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Write result: " + Arrays.toString(updateCounts)); + } - preparedStatement.setString(1, key.toString()); - preparedStatement.setString(2, jsonString); - preparedStatement.setString(3, jsonString); + return true; + } catch (BatchUpdateException e) { + LOGGER.error("BatchUpdateException bulk inserting documents.", e); + } catch (SQLException e) { + LOGGER.error("SQLException bulk inserting documents. SQLState: {} Error Code:{}", + e.getSQLState(), e.getErrorCode(), e); + } catch (IOException e) { + LOGGER.error("SQLException bulk inserting documents. documents: {}", documents, e); + } - preparedStatement.addBatch(); - } + return false; + } + + private int[] bulkUpsertImpl(Map documents) throws SQLException, IOException { + PreparedStatement preparedStatement = client + .prepareStatement(getUpsertSQL(), Statement.RETURN_GENERATED_KEYS); + for (Map.Entry entry : documents.entrySet()) { + + Key key = entry.getKey(); + String jsonString = prepareUpsertDocument(key, entry.getValue()); + + preparedStatement.setString(1, key.toString()); + preparedStatement.setString(2, jsonString); + preparedStatement.setString(3, jsonString); + + preparedStatement.addBatch(); + } + + return preparedStatement.executeBatch(); + } - int[] updateCounts = preparedStatement.executeBatch(); + @Override + public Iterator bulkUpsertAndReturn(Map documents) throws IOException { + try { + int[] updateCounts = bulkUpsertImpl(documents); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Write result: " + Arrays.toString(updateCounts)); } - return true; + String collect = documents.keySet().stream() + .map(val -> "'" + val.toString() + "'") + .collect(Collectors.joining(", ")); + + String space = " "; + String query = new StringBuilder("SELECT * FROM") + .append(space).append(collectionName) + .append(" WHERE ").append(ID).append(" IN ") + .append("(").append(collect).append(")").toString(); + + try { + PreparedStatement preparedStatement = client.prepareStatement(query); + ResultSet resultSet = preparedStatement.executeQuery(); + return new PostgresResultIterator(resultSet); + } catch (SQLException e) { + LOGGER.error("SQLException querying documents. query: {}", query, e); + } } catch (BatchUpdateException e) { LOGGER.error("BatchUpdateException bulk inserting documents.", e); } catch (SQLException e) { @@ -442,7 +482,7 @@ public boolean bulkUpsert(Map documents) { LOGGER.error("SQLException bulk inserting documents. documents: {}", documents, e); } - return false; + throw new IOException("Could not bulk upsert the documents."); } private String prepareUpsertDocument(Key key, Document document) throws IOException { @@ -472,7 +512,7 @@ public void drop() { } } - class PostgresResultIterator implements Iterator { + static class PostgresResultIterator implements Iterator { private final ObjectMapper MAPPER = new ObjectMapper(); ResultSet resultSet; From b36a521f9e572483fae547b2f24e4e9dc25b7ca1 Mon Sep 17 00:00:00 2001 From: Buchi Reddy Busi Reddy Date: Tue, 15 Dec 2020 15:08:09 -0800 Subject: [PATCH 2/4] Changing the API to returnAndBulkUpsert() because that's the use case. --- .../mongo/MongoDocStoreTest.java | 11 +++++++--- .../postgres/PostgresDocStoreTest.java | 19 +++++++++++------- .../core/documentstore/Collection.java | 7 ++++--- .../documentstore/mongo/MongoCollection.java | 10 ++++++---- .../postgres/PostgresCollection.java | 20 ++++++++----------- 5 files changed, 38 insertions(+), 29 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java index 37924426..02615786 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java @@ -112,8 +112,8 @@ public void testIgnoreCaseLikeQuery() throws IOException { String persistedDocument = documents.get(0).toJson(); JsonNode jsonNode = OBJECT_MAPPER.reader().readTree(persistedDocument); Assertions.assertTrue(persistedDocument.contains("Bob")); - Assertions.assertTrue(jsonNode.findValue("createdTime").asLong(0) > now); - Assertions.assertTrue(jsonNode.findValue("lastUpdatedTime").asLong(0) > now); + Assertions.assertTrue(jsonNode.findValue("createdTime").asLong(0) >= now); + Assertions.assertTrue(jsonNode.findValue("lastUpdatedTime").asLong(0) >= now); } } @@ -403,7 +403,12 @@ public void testBulkUpsertAndReturn() throws IOException { new SingleValueKey("default", "testKey2"), createDocument("testKey2", "abc2") ); - Iterator iterator = collection.bulkUpsertAndReturn(documentMap); + Iterator iterator = collection.returnAndBulkUpsert(documentMap); + // Initially there shouldn't be any documents. + Assertions.assertFalse(iterator.hasNext()); + + // The operation should be idempotent so go ahead and try again. + iterator = collection.returnAndBulkUpsert(documentMap); assertEquals(2, collection.count()); List documents = new ArrayList<>(); while (iterator.hasNext()) { diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java index db859cd3..1b4aa44e 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java @@ -99,7 +99,7 @@ public void testUpsert() throws IOException { query.setFilter(Filter.eq(ID, "default:testKey")); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertFalse(documents.isEmpty()); @@ -165,7 +165,12 @@ public void testBulkUpsertAndReturn() throws IOException { bulkMap.put( new SingleValueKey("default", "testKey6"), createDocument("email", "bob@example.com")); - Iterator iterator = collection.bulkUpsertAndReturn(bulkMap); + Iterator iterator = collection.returnAndBulkUpsert(bulkMap); + // Initially there shouldn't be any documents. + Assertions.assertFalse(iterator.hasNext()); + + // The operation should be idempotent, so go ahead and try again. + iterator = collection.returnAndBulkUpsert(bulkMap); List documents = new ArrayList<>(); while (iterator.hasNext()) { documents.add(iterator.next()); @@ -209,7 +214,7 @@ public void testSubDocumentUpdate() throws IOException { query.setFilter(Filter.eq(ID, "default:testKey")); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertFalse(documents.isEmpty()); @@ -269,7 +274,7 @@ public void testDeleteAll() throws IOException { } @Test - public void testDrop() throws IOException { + public void testDrop() { Collection collection = datastore.getCollection(COLLECTION_NAME); Assertions.assertTrue(datastore.listCollections().contains("postgres." + COLLECTION_NAME)); @@ -289,7 +294,7 @@ public void testIgnoreCaseLikeQuery() throws IOException { query.setFilter(new Filter(Filter.Op.LIKE, "name", searchValue)); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertFalse(documents.isEmpty()); @@ -331,7 +336,7 @@ public void testSearchForNestedKey() throws IOException { .setFilter(new Filter(Filter.Op.EQ, "attributes.span_id.value.string", "6449f1f720c93a67")); Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } Assertions.assertEquals(documents.size(), 1); @@ -387,7 +392,7 @@ public void testOffsetLimitAndOrderBY() throws IOException { Iterator results = collection.search(query); List documents = new ArrayList<>(); - for (; results.hasNext(); ) { + while (results.hasNext()) { documents.add(results.next()); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index 1da5a0c1..2bf71589 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -81,10 +81,11 @@ public interface Collection { boolean bulkUpsert(Map documents); /** - * Method to bulkUpsert the given documents and return the latest copies of those documents. - * This helps the clients to avoid an additional round trip. + * Method to bulkUpsert the given documents and return the previous copies of those documents. + * This helps the clients to see how the documents were prior to upserting them and do that + * in one less round trip. */ - Iterator bulkUpsertAndReturn(Map documents) throws IOException; + Iterator returnAndBulkUpsert(Map documents) throws IOException; /** * Drops a collections diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index 0acb1486..d6bbcc29 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -412,15 +412,17 @@ private BulkWriteResult bulkUpsertImpl(Map documents) throws Json } @Override - public Iterator bulkUpsertAndReturn(Map documents) throws IOException { + public Iterator returnAndBulkUpsert(Map documents) throws IOException { try { + // First get all the documents for the given keys. + FindIterable cursor = collection.find(selectionCriteriaForKeys(documents.keySet())); + final MongoCursor mongoCursor = cursor.cursor(); + + // Now go ahead and do the bulk upsert. BulkWriteResult result = bulkUpsertImpl(documents); LOGGER.debug(result.toString()); - FindIterable cursor = collection.find(selectionCriteriaForKeys(documents.keySet())); - final MongoCursor mongoCursor = cursor.cursor(); return new Iterator<>() { - @Override public boolean hasNext() { return mongoCursor.hasNext(); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index 265e27ed..c08601d4 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -448,14 +448,8 @@ private int[] bulkUpsertImpl(Map documents) throws SQLException, } @Override - public Iterator bulkUpsertAndReturn(Map documents) throws IOException { + public Iterator returnAndBulkUpsert(Map documents) throws IOException { try { - int[] updateCounts = bulkUpsertImpl(documents); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Write result: " + Arrays.toString(updateCounts)); - } - String collect = documents.keySet().stream() .map(val -> "'" + val.toString() + "'") .collect(Collectors.joining(", ")); @@ -469,15 +463,17 @@ public Iterator bulkUpsertAndReturn(Map documents) thro try { PreparedStatement preparedStatement = client.prepareStatement(query); ResultSet resultSet = preparedStatement.executeQuery(); + + // Now go ahead and bulk upsert the documents. + int[] updateCounts = bulkUpsertImpl(documents); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Write result: " + Arrays.toString(updateCounts)); + } + return new PostgresResultIterator(resultSet); } catch (SQLException e) { LOGGER.error("SQLException querying documents. query: {}", query, e); } - } catch (BatchUpdateException e) { - LOGGER.error("BatchUpdateException bulk inserting documents.", e); - } catch (SQLException e) { - LOGGER.error("SQLException bulk inserting documents. SQLState: {} Error Code:{}", - e.getSQLState(), e.getErrorCode(), e); } catch (IOException e) { LOGGER.error("SQLException bulk inserting documents. documents: {}", documents, e); } From b0207829efc8371576cf749343c248ea7b3ac6aa Mon Sep 17 00:00:00 2001 From: Buchi Reddy Busi Reddy Date: Wed, 16 Dec 2020 15:01:25 -0800 Subject: [PATCH 3/4] Improving the assertions on the integration tests. --- document-store/build.gradle.kts | 1 + .../mongo/MongoDocStoreTest.java | 69 +++++++++++++++++-- 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/document-store/build.gradle.kts b/document-store/build.gradle.kts index e2e1372c..bd69e53c 100644 --- a/document-store/build.gradle.kts +++ b/document-store/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { testImplementation("org.junit.jupiter:junit-jupiter:5.6.2") testImplementation("org.mockito:mockito-core:2.19.0") integrationTestImplementation("org.junit.jupiter:junit-jupiter:5.6.2") + integrationTestImplementation("com.github.java-json-tools:json-patch:1.13") } tasks.test { diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java index 02615786..d8cd8631 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java @@ -5,9 +5,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.fge.jsonpatch.diff.JsonDiff; import com.mongodb.BasicDBObject; import com.mongodb.DBObject; import com.mongodb.client.FindIterable; @@ -22,9 +24,13 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.DatastoreProvider; @@ -395,20 +401,24 @@ public void testBulkUpsert() { } @Test - public void testBulkUpsertAndReturn() throws IOException { + public void testReturnAndBulkUpsert() throws IOException { datastore.createCollection(COLLECTION_NAME, null); Collection collection = datastore.getCollection(COLLECTION_NAME); - Map documentMap = Map.of( - new SingleValueKey("default", "testKey1"), createDocument("testKey1", "abc1"), - new SingleValueKey("default", "testKey2"), createDocument("testKey2", "abc2") + Map documentMapV1 = Map.of( + new SingleValueKey("default", "testKey1"), createDocument("id", "1", "testKey1", "abc-v1"), + new SingleValueKey("default", "testKey2"), createDocument("id", "2", "testKey2", "xyz-v1") ); - Iterator iterator = collection.returnAndBulkUpsert(documentMap); + Iterator iterator = collection.returnAndBulkUpsert(documentMapV1); // Initially there shouldn't be any documents. Assertions.assertFalse(iterator.hasNext()); - // The operation should be idempotent so go ahead and try again. - iterator = collection.returnAndBulkUpsert(documentMap); + // Add more details to the document and bulk upsert again. + Map documentMapV2 = Map.of( + new SingleValueKey("default", "testKey1"), createDocument("id", "1", "testKey1", "abc-v2"), + new SingleValueKey("default", "testKey2"), createDocument("id", "2", "testKey2", "xyz-v2") + ); + iterator = collection.returnAndBulkUpsert(documentMapV2); assertEquals(2, collection.count()); List documents = new ArrayList<>(); while (iterator.hasNext()) { @@ -416,11 +426,48 @@ public void testBulkUpsertAndReturn() throws IOException { } assertEquals(2, documents.size()); + Map expectedDocs = convertToMap(documentMapV1.values(), "id"); + Map actualDocs = convertToMap(documents, "id"); + + // Verify that the documents returned were previous copies. + for (Map.Entry entry: expectedDocs.entrySet()) { + JsonNode expected = entry.getValue(); + JsonNode actual = actualDocs.get(entry.getKey()); + + Assertions.assertNotNull(actual); + JsonNode patch = JsonDiff.asJson(expected, actual); + + // Verify that there are only additions and "no" removals in this new node. + Set ops = new HashSet<>(); + patch.elements().forEachRemaining(e -> { + if (e.has("op")) { + ops.add(e.get("op").asText()); + } + }); + + Assertions.assertTrue(ops.contains("add")); + Assertions.assertEquals(1, ops.size()); + } + // Delete one of the documents and test again. collection.delete(new SingleValueKey("default", "testKey1")); assertEquals(1, collection.count()); } + private Map convertToMap(java.util.Collection docs, String key) { + return docs.stream() + .map(d -> { + try { + return OBJECT_MAPPER.reader().readTree(d.toJson()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toMap(d -> d.get(key).asText(), d -> d)); + } + @Test public void testLike() { MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017"); @@ -458,6 +505,14 @@ public void testLike() { assertEquals(1, results.size()); } + private Document createDocument(String ...keys) { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + for (int i = 0; i < keys.length - 1; i++) { + objectNode.put(keys[i], keys[i + 1]); + } + return new JSONDocument(objectNode); + } + private Document createDocument(String key, String value) { ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); objectNode.put(key, value); From 37ba36493e4b7aa8530f7acf9378b84cff58a4ba Mon Sep 17 00:00:00 2001 From: Buchi Reddy Busi Reddy Date: Thu, 17 Dec 2020 08:52:42 -0800 Subject: [PATCH 4/4] Renaming the method. --- .../core/documentstore/mongo/MongoDocStoreTest.java | 4 ++-- .../core/documentstore/postgres/PostgresDocStoreTest.java | 4 ++-- .../java/org/hypertrace/core/documentstore/Collection.java | 2 +- .../hypertrace/core/documentstore/mongo/MongoCollection.java | 2 +- .../core/documentstore/postgres/PostgresCollection.java | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java index d8cd8631..0d56449b 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/mongo/MongoDocStoreTest.java @@ -409,7 +409,7 @@ public void testReturnAndBulkUpsert() throws IOException { new SingleValueKey("default", "testKey2"), createDocument("id", "2", "testKey2", "xyz-v1") ); - Iterator iterator = collection.returnAndBulkUpsert(documentMapV1); + Iterator iterator = collection.bulkUpsertAndReturnOlderDocuments(documentMapV1); // Initially there shouldn't be any documents. Assertions.assertFalse(iterator.hasNext()); @@ -418,7 +418,7 @@ public void testReturnAndBulkUpsert() throws IOException { new SingleValueKey("default", "testKey1"), createDocument("id", "1", "testKey1", "abc-v2"), new SingleValueKey("default", "testKey2"), createDocument("id", "2", "testKey2", "xyz-v2") ); - iterator = collection.returnAndBulkUpsert(documentMapV2); + iterator = collection.bulkUpsertAndReturnOlderDocuments(documentMapV2); assertEquals(2, collection.count()); List documents = new ArrayList<>(); while (iterator.hasNext()) { diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java index 1b4aa44e..5d11dd84 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java @@ -165,12 +165,12 @@ public void testBulkUpsertAndReturn() throws IOException { bulkMap.put( new SingleValueKey("default", "testKey6"), createDocument("email", "bob@example.com")); - Iterator iterator = collection.returnAndBulkUpsert(bulkMap); + Iterator iterator = collection.bulkUpsertAndReturnOlderDocuments(bulkMap); // Initially there shouldn't be any documents. Assertions.assertFalse(iterator.hasNext()); // The operation should be idempotent, so go ahead and try again. - iterator = collection.returnAndBulkUpsert(bulkMap); + iterator = collection.bulkUpsertAndReturnOlderDocuments(bulkMap); List documents = new ArrayList<>(); while (iterator.hasNext()) { documents.add(iterator.next()); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index 2bf71589..626a4af5 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -85,7 +85,7 @@ public interface Collection { * This helps the clients to see how the documents were prior to upserting them and do that * in one less round trip. */ - Iterator returnAndBulkUpsert(Map documents) throws IOException; + Iterator bulkUpsertAndReturnOlderDocuments(Map documents) throws IOException; /** * Drops a collections diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index d6bbcc29..451cb72a 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -412,7 +412,7 @@ private BulkWriteResult bulkUpsertImpl(Map documents) throws Json } @Override - public Iterator returnAndBulkUpsert(Map documents) throws IOException { + public Iterator bulkUpsertAndReturnOlderDocuments(Map documents) throws IOException { try { // First get all the documents for the given keys. FindIterable cursor = collection.find(selectionCriteriaForKeys(documents.keySet())); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index c08601d4..6eaab35c 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -448,7 +448,7 @@ private int[] bulkUpsertImpl(Map documents) throws SQLException, } @Override - public Iterator returnAndBulkUpsert(Map documents) throws IOException { + public Iterator bulkUpsertAndReturnOlderDocuments(Map documents) throws IOException { try { String collect = documents.keySet().stream() .map(val -> "'" + val.toString() + "'")