Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void writeAll(List<?> values) throws Exception {

// Deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
for (Object value : values) {
for (Object value : entry.getValue()) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
Expand All @@ -191,6 +191,7 @@ public void writeAll(List<?> values) throws Exception {

try (WriteBatch batch = db().createWriteBatch()) {
while (valueIter.hasNext()) {
assert serializedValueIter.hasNext();
updateBatch(batch, valueIter.next(), serializedValueIter.next(), klass,
naturalIndex, indices);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void writeAll(List<?> values) throws Exception {

// Deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
for (Object value : values) {
for (Object value : entry.getValue()) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
Expand All @@ -223,6 +223,7 @@ public void writeAll(List<?> values) throws Exception {

try (WriteBatch writeBatch = new WriteBatch()) {
while (valueIter.hasNext()) {
assert serializedValueIter.hasNext();
updateBatch(writeBatch, valueIter.next(), serializedValueIter.next(), klass,
naturalIndex, indices);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -422,6 +423,37 @@ public void testResourceCleaner() throws Exception {
}
}

@Test
public void testMultipleTypesWriteAll() throws Exception {

List<CustomType1> type1List = Arrays.asList(
createCustomType1(1),
createCustomType1(2),
createCustomType1(3),
createCustomType1(4)
);

List<CustomType2> type2List = Arrays.asList(
createCustomType2(10),
createCustomType2(11),
createCustomType2(12),
createCustomType2(13)
);

List fullList = new ArrayList();
fullList.addAll(type1List);
fullList.addAll(type2List);

db.writeAll(fullList);
for (CustomType1 value : type1List) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to understand how this test fails. (Sorry I lost the whole context.)

Because if it's just that writing for the same entity happens multiple times, it depends on how we constuct the key (since LevelDB/RocksDB put is upsert). If the same entity creates the same key, I don't see how this could fail. Maybe even further, even if multiple writes come up with different keys, if we can read any of them, the result should be the same. Seems like I'm unaware of how this works.

Copy link
Contributor Author

@mridulm mridulm Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, I purposely ensure both CustomType1 and CustomType2 are written together in a call to writeAll - and subsequently try to read them - which ends up with failing when deserializing CustomType2 (as CustomType1 was written to those records).

Without the fix, it results in deserialization error during read.
For LevelDBSuite, we see this for example - since it is writing CustomType1 instead of CustomType2 as value:

[ERROR] org.apache.spark.util.kvstore.LevelDBSuite.testMultipleTypesWriteAll -- Time elapsed: 0.024 s <<< ERROR!
com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "name" (class org.apache.spark.util.kvstore.CustomType2), not marked as ignorable (3 known properties: "id", "parentId", "key"])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 34] (through reference chain: org.apache.spark.util.kvstore.CustomType2["name"])
        at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1153)
        at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2241)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1821)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1799)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:316)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)
        at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4917)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3897)
        at org.apache.spark.util.kvstore.KVStoreSerializer.deserialize(KVStoreSerializer.java:70)
        at org.apache.spark.util.kvstore.LevelDB.get(LevelDB.java:136)
        at org.apache.spark.util.kvstore.LevelDB.read(LevelDB.java:148)
        at org.apache.spark.util.kvstore.LevelDBSuite.testMultipleTypesWriteAll(LevelDBSuite.java:452)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. I got the point. They are written with "wrong type information". Got it.

assertEquals(value, db.read(value.getClass(), value.key));
}
for (CustomType2 value : type2List) {
assertEquals(value, db.read(value.getClass(), value.key));
}
}


private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand All @@ -432,6 +464,14 @@ private CustomType1 createCustomType1(int i) {
return t;
}

private CustomType2 createCustomType2(int i) {
CustomType2 t = new CustomType2();
t.key = "key" + i;
t.id = "id" + i;
t.parentId = "parent_id" + (i / 2);
return t;
}

private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -420,6 +421,36 @@ public void testResourceCleaner() throws Exception {
}
}

@Test
public void testMultipleTypesWriteAll() throws Exception {

List<CustomType1> type1List = Arrays.asList(
createCustomType1(1),
createCustomType1(2),
createCustomType1(3),
createCustomType1(4)
);

List<CustomType2> type2List = Arrays.asList(
createCustomType2(10),
createCustomType2(11),
createCustomType2(12),
createCustomType2(13)
);

List fullList = new ArrayList();
fullList.addAll(type1List);
fullList.addAll(type2List);

db.writeAll(fullList);
for (CustomType1 value : type1List) {
assertEquals(value, db.read(value.getClass(), value.key));
}
for (CustomType2 value : type2List) {
assertEquals(value, db.read(value.getClass(), value.key));
}
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand All @@ -430,6 +461,14 @@ private CustomType1 createCustomType1(int i) {
return t;
}

private CustomType2 createCustomType2(int i) {
CustomType2 t = new CustomType2();
t.key = "key" + i;
t.id = "id" + i;
t.parentId = "parent_id" + (i / 2);
return t;
}

private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
Expand Down
Loading