-
Notifications
You must be signed in to change notification settings - Fork 11
Adding a new returnAndBulkUpsert API in the doc store. #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,7 +99,7 @@ public void testUpsert() throws IOException { | |
| query.setFilter(Filter.eq(ID, "default:testKey")); | ||
| Iterator<Document> results = collection.search(query); | ||
| List<Document> documents = new ArrayList<>(); | ||
| for (; results.hasNext(); ) { | ||
| while (results.hasNext()) { | ||
| documents.add(results.next()); | ||
| } | ||
| Assertions.assertFalse(documents.isEmpty()); | ||
|
|
@@ -119,7 +119,7 @@ public void testUpsertAndReturn() throws IOException { | |
| } | ||
|
|
||
| @Test | ||
| public void testBulkUpsert() throws IOException { | ||
| public void testBulkUpsert() { | ||
| Collection collection = datastore.getCollection(COLLECTION_NAME); | ||
| Map<Key, Document> bulkMap = new HashMap<>(); | ||
| bulkMap.put(new SingleValueKey("default", "testKey1"), createDocument("name", "Bob")); | ||
|
|
@@ -153,6 +153,51 @@ public void testBulkUpsert() throws IOException { | |
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testBulkUpsertAndReturn() throws IOException { | ||
| Collection collection = datastore.getCollection(COLLECTION_NAME); | ||
| Map<Key, Document> bulkMap = new HashMap<>(); | ||
| bulkMap.put(new SingleValueKey("default", "testKey1"), createDocument("name", "Bob")); | ||
| bulkMap.put(new SingleValueKey("default", "testKey2"), createDocument("name", "Alice")); | ||
| bulkMap.put(new SingleValueKey("default", "testKey3"), createDocument("name", "Alice")); | ||
| bulkMap.put(new SingleValueKey("default", "testKey4"), createDocument("name", "Bob")); | ||
| bulkMap.put(new SingleValueKey("default", "testKey5"), createDocument("name", "Alice")); | ||
| bulkMap.put( | ||
| new SingleValueKey("default", "testKey6"), createDocument("email", "[email protected]")); | ||
|
|
||
| Iterator<Document> iterator = collection.bulkUpsertAndReturnOlderDocuments(bulkMap); | ||
| // Initially there shouldn't be any documents. | ||
| Assertions.assertFalse(iterator.hasNext()); | ||
|
|
||
| // The operation should be idempotent, so go ahead and try again. | ||
| iterator = collection.bulkUpsertAndReturnOlderDocuments(bulkMap); | ||
| List<Document> documents = new ArrayList<>(); | ||
| while (iterator.hasNext()) { | ||
| documents.add(iterator.next()); | ||
| } | ||
| Assertions.assertEquals(6, documents.size()); | ||
|
|
||
| { | ||
| // empty query returns all the documents | ||
| Query query = new Query(); | ||
| Assertions.assertEquals(6, collection.total(query)); | ||
| } | ||
|
|
||
| { | ||
| Query query = new Query(); | ||
| query.setFilter(Filter.eq("name", "Bob")); | ||
| Assertions.assertEquals(2, collection.total(query)); | ||
| } | ||
|
|
||
| { | ||
| // limit should not affect the total | ||
| Query query = new Query(); | ||
| query.setFilter(Filter.eq("name", "Bob")); | ||
| query.setLimit(1); | ||
| Assertions.assertEquals(2, collection.total(query)); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testSubDocumentUpdate() throws IOException { | ||
| Collection collection = datastore.getCollection(COLLECTION_NAME); | ||
|
|
@@ -169,7 +214,7 @@ public void testSubDocumentUpdate() throws IOException { | |
| query.setFilter(Filter.eq(ID, "default:testKey")); | ||
| Iterator<Document> results = collection.search(query); | ||
| List<Document> documents = new ArrayList<>(); | ||
| for (; results.hasNext(); ) { | ||
| while (results.hasNext()) { | ||
| documents.add(results.next()); | ||
| } | ||
| Assertions.assertFalse(documents.isEmpty()); | ||
|
|
@@ -229,7 +274,7 @@ public void testDeleteAll() throws IOException { | |
| } | ||
|
|
||
| @Test | ||
| public void testDrop() throws IOException { | ||
| public void testDrop() { | ||
| Collection collection = datastore.getCollection(COLLECTION_NAME); | ||
|
|
||
| Assertions.assertTrue(datastore.listCollections().contains("postgres." + COLLECTION_NAME)); | ||
|
|
@@ -249,7 +294,7 @@ public void testIgnoreCaseLikeQuery() throws IOException { | |
| query.setFilter(new Filter(Filter.Op.LIKE, "name", searchValue)); | ||
| Iterator<Document> results = collection.search(query); | ||
| List<Document> documents = new ArrayList<>(); | ||
| for (; results.hasNext(); ) { | ||
| while (results.hasNext()) { | ||
| documents.add(results.next()); | ||
| } | ||
| Assertions.assertFalse(documents.isEmpty()); | ||
|
|
@@ -271,7 +316,7 @@ public void testSearch() throws IOException { | |
| query.setFilter(new Filter(Filter.Op.EQ, DOCUMENT_ID, key.toString())); | ||
| Iterator<Document> results = collection.search(query); | ||
| List<Document> documents = new ArrayList<>(); | ||
| for (; results.hasNext(); ) { | ||
| while (results.hasNext()) { | ||
| documents.add(results.next()); | ||
| } | ||
| Assertions.assertEquals(documents.size(), 1); | ||
|
|
@@ -291,7 +336,7 @@ public void testSearchForNestedKey() throws IOException { | |
| .setFilter(new Filter(Filter.Op.EQ, "attributes.span_id.value.string", "6449f1f720c93a67")); | ||
| Iterator<Document> results = collection.search(query); | ||
| List<Document> documents = new ArrayList<>(); | ||
| for (; results.hasNext(); ) { | ||
| while (results.hasNext()) { | ||
| documents.add(results.next()); | ||
| } | ||
| Assertions.assertEquals(documents.size(), 1); | ||
|
|
@@ -347,7 +392,7 @@ public void testOffsetLimitAndOrderBY() throws IOException { | |
|
|
||
| Iterator<Document> results = collection.search(query); | ||
| List<Document> documents = new ArrayList<>(); | ||
| for (; results.hasNext(); ) { | ||
| while (results.hasNext()) { | ||
| documents.add(results.next()); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Map.Entry; | ||
| import java.util.Set; | ||
| import java.util.function.Function; | ||
| import java.util.stream.Collectors; | ||
| import net.jodah.failsafe.Failsafe; | ||
|
|
@@ -386,28 +387,58 @@ public long total(Query query) { | |
| @Override | ||
| public boolean bulkUpsert(Map<Key, Document> documents) { | ||
| try { | ||
| List<UpdateOneModel<BasicDBObject>> bulkCollection = new ArrayList<>(); | ||
| for (Entry<Key, Document> entry : documents.entrySet()) { | ||
| Key key = entry.getKey(); | ||
| // insert or overwrite | ||
| bulkCollection.add(new UpdateOneModel<>( | ||
| this.selectionCriteriaForKey(key), | ||
| prepareUpsert(key, entry.getValue()), | ||
| new UpdateOptions().upsert(true))); | ||
| } | ||
|
|
||
| BulkWriteResult result = Failsafe.with(bulkWriteRetryPolicy) | ||
| .get(() -> collection.bulkWrite(bulkCollection, new BulkWriteOptions().ordered(false))); | ||
| BulkWriteResult result = bulkUpsertImpl(documents); | ||
| LOGGER.debug(result.toString()); | ||
|
|
||
| return true; | ||
|
|
||
| } catch (IOException | MongoServerException e) { | ||
| LOGGER.error("Error during bulk upsert for documents:{}", documents, e); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private BulkWriteResult bulkUpsertImpl(Map<Key, Document> documents) throws JsonProcessingException { | ||
| List<UpdateOneModel<BasicDBObject>> bulkCollection = new ArrayList<>(); | ||
| for (Entry<Key, Document> entry : documents.entrySet()) { | ||
| Key key = entry.getKey(); | ||
| // insert or overwrite | ||
| bulkCollection.add(new UpdateOneModel<>( | ||
| this.selectionCriteriaForKey(key), | ||
| prepareUpsert(key, entry.getValue()), | ||
| new UpdateOptions().upsert(true))); | ||
| } | ||
|
|
||
| return Failsafe.with(bulkWriteRetryPolicy) | ||
| .get(() -> collection.bulkWrite(bulkCollection, new BulkWriteOptions().ordered(false))); | ||
| } | ||
|
|
||
| @Override | ||
| public Iterator<Document> bulkUpsertAndReturnOlderDocuments(Map<Key, Document> documents) throws IOException { | ||
| try { | ||
| // First get all the documents for the given keys. | ||
| FindIterable<BasicDBObject> cursor = collection.find(selectionCriteriaForKeys(documents.keySet())); | ||
| final MongoCursor<BasicDBObject> mongoCursor = cursor.cursor(); | ||
|
|
||
| // Now go ahead and do the bulk upsert. | ||
| BulkWriteResult result = bulkUpsertImpl(documents); | ||
| LOGGER.debug(result.toString()); | ||
|
|
||
| return new Iterator<>() { | ||
| @Override | ||
| public boolean hasNext() { | ||
| return mongoCursor.hasNext(); | ||
| } | ||
|
|
||
| @Override | ||
| public Document next() { | ||
| return MongoCollection.this.dbObjectToDocument(mongoCursor.next()); | ||
| } | ||
| }; | ||
| } catch (JsonProcessingException e) { | ||
| LOGGER.error("Error during bulk upsert for documents:{}", documents, e); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is more like an in general comment. I am usually in favour of either log the error and handle it or bubble up the exception but not both of them because they usually flood the logs. Also, do we want to print the full set of documents in logs? How about privacy concerns and also efficient usage of the log storage? I don't thing dumping the failing documents in the logs is actionable either.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Raised #25 |
||
| throw new IOException("Error during bulk upsert."); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not passing the previous exception makes us loosing all the context on this error. Is there any reason for not doing it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking the API should mask the implementation specific exception details but this is actually a library so I'll fix it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Raised #25 |
||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void drop() { | ||
| collection.drop(); | ||
|
|
@@ -417,6 +448,11 @@ private BasicDBObject selectionCriteriaForKey(Key key) { | |
| return new BasicDBObject(ID_KEY, key.toString()); | ||
| } | ||
|
|
||
| private BasicDBObject selectionCriteriaForKeys(Set<Key> keys) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't this be private? A simple inspection tells me yes it does but I am not 100% sure. Tho
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you meant to ask about static right? Fixing it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I meant static, sorry.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Raised #25 |
||
| return new BasicDBObject(Map.of(ID_KEY, new BasicDBObject("$in", | ||
| keys.stream().map(Key::toString).collect(Collectors.toList())))); | ||
| } | ||
|
|
||
| private Document dbObjectToDocument(BasicDBObject dbObject) { | ||
| try { | ||
| // Hack: Remove the _id field since it's an unrecognized field for Proto layer. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about the performance of this. Not a Javaer here but it seams whether logger debug is enabled or not we still turn it into string? cc @kotharironak
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 we should always be letting the logger doing the stringification for us so we don't have to eat this cost unless the message is needed. That means wrapping it in an
ifor IMO, more graceful to doLOGGER.debug("{}", result);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing in a new PR. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#25