Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ public boolean equals(Object o) {
return false;
} else if (!name.equals(that.name)) {
return false;
} else if (!Objects.equals(defaultValue, that.defaultValue)) {
} else if (!Objects.equals(defaultValue, that.defaultValue) &&
!Arrays.equals((byte[]) defaultValue, (byte[]) that.defaultValue)) {
return false;
} else if (!Objects.equals(doc, that.doc)) {
return false;
Expand Down
172 changes: 17 additions & 155 deletions core/src/main/java/org/apache/iceberg/SchemaParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,18 @@

package org.apache.iceberg;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.util.internal.JacksonUtils;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -69,22 +63,15 @@ private SchemaParser() {
private static final String VALUE_REQUIRED = "value-required";
private static final String DEFAULT = "default";

private static final List<Class> primitiveClasses = Arrays.asList(Boolean.class, Integer.class, Long.class,
Float.class, Double.class, CharSequence.class, String.class, java.util.UUID.class, BigDecimal.class);

private static void writeDefaultValue(Object defaultValue, Type type, JsonGenerator generator) throws IOException {
if (defaultValue == null) {
return;
}
generator.writeFieldName(DEFAULT);
if (type.isListType()) {
generator.writeString(defaultValueToJsonString((List<Object>) defaultValue));
} else if (type.isStructType() || type.isMapType()) {
generator.writeString(defaultValueToJsonString((Map<String, Object>) defaultValue));
} else if (isFixedOrBinary(type)) {
generator.writeString(defaultValueToJsonString((byte[]) defaultValue));
if (isFixedOrBinary(type)) {
generator.writeRawValue(defaultValueToJsonString((byte[]) defaultValue));
} else {
generator.writeString(defaultValueToJsonString(defaultValue));
generator.writeRawValue(defaultValueToJsonString(defaultValue));
}
}

Expand All @@ -100,8 +87,7 @@ static void toJson(Types.StructType struct, JsonGenerator generator) throws IOEx
generator.writeBooleanField(REQUIRED, field.isRequired());
generator.writeFieldName(TYPE);
toJson(field.type(), generator);
// BDP-11826: Disable serializing default value
// writeDefaultValue(field.getDefaultValue(), field.type(), generator);
writeDefaultValue(field.getDefaultValue(), field.type(), generator);
if (field.doc() != null) {
generator.writeStringField(DOC, field.doc());
}
Expand Down Expand Up @@ -217,21 +203,16 @@ private static Object defaultValueFromJson(JsonNode field, Type type) {
return null;
}

String defaultValueString = field.get(DEFAULT).asText();

if (isFixedOrBinary(type)) {
return defaultValueFromJsonBytesField(defaultValueString);
}

if (type.isPrimitiveType()) {
return primitiveDefaultValueFromJsonString(defaultValueString, type);
try {
return field.get(DEFAULT).binaryValue();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

try {
return defaultValueFromJsonString(defaultValueString, type);
} catch (IOException e) {
throw new RuntimeException(e);
}
return AvroSchemaUtil.convertToJavaDefaultValue(JacksonUtils.toObject(field.get(DEFAULT),
AvroSchemaUtil.convert(type)));
}

private static Types.StructType structFromJson(JsonNode json) {
Expand All @@ -249,14 +230,13 @@ private static Types.StructType structFromJson(JsonNode json) {
int id = JsonUtil.getInt(ID, field);
String name = JsonUtil.getString(NAME, field);
Type type = typeFromJson(field.get(TYPE));
// BDP-11826: Disable deserializing default value
// Object defaultValue = defaultValueFromJson(field, type);
Object defaultValue = defaultValueFromJson(field, type);
String doc = JsonUtil.getStringOrNull(DOC, field);
boolean isRequired = JsonUtil.getBool(REQUIRED, field);
if (isRequired) {
fields.add(Types.NestedField.required(id, name, type, doc));
fields.add(Types.NestedField.required(id, name, type, defaultValue, doc));
} else {
fields.add(Types.NestedField.optional(id, name, type, doc));
fields.add(Types.NestedField.optional(id, name, type, defaultValue, doc));
}
}

Expand Down Expand Up @@ -313,19 +293,6 @@ public static Schema fromJson(String json) {
});
}

private static String defaultValueToJsonString(Map<String, Object> map) {
Map<String, String> jsonStringElementsMap = new LinkedHashMap<>();
map.entrySet().forEach(
entry -> jsonStringElementsMap.put(entry.getKey(), defaultValueToJsonString(entry.getValue())));
return defaultValueToJsonString(jsonStringElementsMap);
}

private static String defaultValueToJsonString(List<Object> list) {
List<String> jsonStringItemsList = new ArrayList<>();
list.forEach(item -> jsonStringItemsList.add(defaultValueToJsonString(item)));
return defaultValueToJsonString(jsonStringItemsList);
}

private static String defaultValueToJsonString(byte[] bytes) {
try {
return JsonUtil.mapper().writeValueAsString(ByteBuffer.wrap(bytes));
Expand All @@ -335,115 +302,10 @@ private static String defaultValueToJsonString(byte[] bytes) {
}

private static String defaultValueToJsonString(Object value) {
if (value == null) {
// Json string representation of null object is string "null"
return "null";
}
if (isPrimitiveClass(value)) {
return value.toString();
}

try {
return JsonUtil.mapper().writeValueAsString(new SerDeValue(value));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private static boolean isPrimitiveClass(Object value) {
return primitiveClasses.contains(value.getClass());
}

private static Object defaultValueFromJsonBytesField(String value) {
try {
return JsonUtil.mapper().readValue(value, ByteBuffer.class).array();
return JsonUtil.mapper().writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private static Object defaultValueFromJsonString(String jsonString, Type type) throws IOException {
Preconditions.checkArgument(!type.isPrimitiveType(), "jsonString %s is for primitive type %s", jsonString, type);
Object jsonStringCollection = JsonUtil.mapper().readValue(jsonString, SerDeValue.class).getValue();

if (type.isListType()) {
Preconditions.checkArgument(jsonStringCollection instanceof List,
"deserialized Json object: (%s) is not List for List type", jsonStringCollection);
List<Object> list = new ArrayList<>();
Type elementType = type.asListType().elementType();
for (String item : (List<String>) jsonStringCollection) {
list.add(elementType.isPrimitiveType() ? primitiveDefaultValueFromJsonString(item, elementType) :
JsonUtil.mapper().readValue(item, SerDeValue.class).getValue());
}
return list;
}

Preconditions.checkArgument((type.isMapType() || type.isStructType()) && jsonStringCollection instanceof Map,
"deserialized Json object: (%s) is not Map for type: %s", jsonStringCollection, type);

// map (MapType or StructType) case
Map<String, Object> map = new HashMap<>();
Map<String, String> jsonStringMap = (HashMap<String, String>) jsonStringCollection;
for (Map.Entry entry : jsonStringMap.entrySet()) {
String key = entry.getKey().toString();
String valueString = entry.getValue().toString();
Type elementType = type.isMapType() ? type.asMapType().valueType() : type.asStructType().field(key).type();
Object value = elementType.isPrimitiveType() ? primitiveDefaultValueFromJsonString(valueString, elementType)
: JsonUtil.mapper().readValue(valueString, SerDeValue.class).getValue();
map.put(key, value);
}
return map;
}

private static Object primitiveDefaultValueFromJsonString(String jsonString, Type type) {
switch (type.typeId()) {
case BOOLEAN:
return Boolean.valueOf(jsonString);
case INTEGER:
case DATE:
return Integer.valueOf(jsonString);
case DECIMAL:
return BigDecimal.valueOf(Long.valueOf(jsonString));
case LONG:
case TIME:
case TIMESTAMP:
return Long.valueOf(jsonString);
case FLOAT:
return Float.valueOf(jsonString);
case DOUBLE:
return Double.valueOf(jsonString);
case STRING:
return jsonString;
case UUID:
return java.util.UUID.fromString(jsonString);
case FIXED:
case BINARY:
return defaultValueFromJsonBytesField(jsonString);
default:
throw new RuntimeException("non-primitive type: " + type);
}
}

/**
* SerDeValue class:
* This is used so that the value to serialize is specified
* as a property, so that the type information gets included in
* the serialized String.
*/
private static class SerDeValue {
// Name of the field used in the intermediate JSON representation
private static final String VALUE_FIELD = "__value__";

@JsonProperty(VALUE_FIELD)
private final Object value;

@JsonCreator
private SerDeValue(@JsonProperty(VALUE_FIELD) Object value) {
this.value = value;
}

private Object getValue() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ static boolean hasNonNullDefaultValue(Schema.Field field) {
!(field.defaultVal() instanceof String && ((String) field.defaultVal()).equalsIgnoreCase("null"));
}

static Object convertToJavaDefaultValue(Object defaultValue) {
public static Object convertToJavaDefaultValue(Object defaultValue) {
if (defaultValue instanceof List) {
return ((List<?>) defaultValue).stream()
.map(AvroSchemaUtil::convertToJavaDefaultValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types.NestedField;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import static org.apache.avro.Schema.Type.BOOLEAN;
Expand All @@ -43,7 +42,7 @@
import static org.apache.avro.Schema.Type.NULL;
import static org.apache.avro.Schema.Type.STRING;

@Ignore("BDP-11826: Disable default value preserving in iceberg schema")

public class TestSchemaParserForDefaultValues {

private void assertEqualStructs(org.apache.iceberg.Schema expected, org.apache.iceberg.Schema actual) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.apache.iceberg.avro;

import java.util.List;
import java.util.stream.IntStream;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -434,4 +436,84 @@ public void testConversionOfRecordDefaultWithOptionalNestedField2() {
"default value: {mapField={foo=bar, x=y}, recordField=null}, \n" +
"}", iSchema.toString());
}

@Test
public void testVariousTypesDefaultValues() {
String schemaString = "{\n" +
" \"namespace\": \"com.razhang\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"RAZHANG\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"f1\",\n" +
" \"type\": \"string\",\n" +
" \"default\": \"foo\"\n" +
" },\n" +
" {\n" +
" \"name\": \"f2\",\n" +
" \"type\": \"int\",\n" +
" \"default\": 1\n" +
" },\n" +
" {\n" +
" \"name\": \"f3\",\n" +
" \"type\": {\n" +
" \"type\": \"map\",\n" +
" \"values\" : \"int\"\n" +
" },\n" +
" \"default\": {\"a\": 1}\n" +
" },\n" +
" {\n" +
" \"name\": \"f4\",\n" +
" \"type\": {\n" +
" \"type\": \"array\",\n" +
" \"items\" : \"int\"\n" +
" },\n" +
" \"default\": [1, 2, 3]\n" +
" },\n" +
" {\n" +
" \"name\": \"f5\",\n" +
" \"type\": {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"F5\",\n" +
" \"fields\" : [\n" +
" {\"name\": \"ff1\", \"type\": \"long\"},\n" +
" {\"name\": \"ff2\", \"type\": [\"null\", \"string\"]}\n" +
" ]\n" +
" },\n" +
" \"default\": {\n" +
" \"ff1\": 999,\n" +
" \"ff2\": null\n" +
" }\n" +
" },\n" +
" {\n" +
" \"name\": \"f6\",\n" +
" \"type\": {\n" +
" \"type\": \"map\",\n" +
" \"values\": {\n" +
" \"type\": \"array\",\n" +
" \"items\" : \"int\"\n" +
" }\n" +
" },\n" +
" \"default\": {\"key\": [1, 2, 3]}\n" +
" },\n" +
" {\n" +
" \"name\": \"f7\",\n" +
" \"type\": {\n" +
" \"type\": \"fixed\",\n" +
" \"name\": \"md5\",\n" +
" \"size\": 2\n" +
" },\n" +
" \"default\": \"fF\"\n" +
" }\n" +
" ]\n" +
"}";
Schema schema = new Schema.Parser().parse(schemaString);
org.apache.iceberg.Schema iSchema = AvroSchemaUtil.toIceberg(schema);

String schemaJson = SchemaParser.toJson(iSchema);
org.apache.iceberg.Schema roundTripiSchema = SchemaParser.fromJson(schemaJson);

Assert.assertTrue(IntStream.range(0, roundTripiSchema.columns().size())
.allMatch(i -> roundTripiSchema.columns().get(i).equals(iSchema.columns().get(i))));
}
}