diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java index 38fcd9f88511e..1da7ca54be4da 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; -import org.apache.hudi.hadoop.utils.ObjectInspectorCache; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; @@ -58,7 +57,6 @@ public class TestHoodieFileGroupReaderOnHive extends HoodieFileGroupReaderOnJavaTestBase { private static final String PARTITION_COLUMN = "datestr"; - private static JobConf baseJobConf; private static HdfsTestService hdfsTestService; private static HoodieStorage storage; private static FileSystem fs; @@ -74,7 +72,7 @@ public static void setUpClass() throws IOException { hdfsTestService = new HdfsTestService(); fs = hdfsTestService.start(true).getFileSystem(); storageConf = HoodieTestUtils.getDefaultStorageConf(); - baseJobConf = new JobConf(storageConf.unwrap()); + JobConf baseJobConf = new JobConf(storageConf.unwrap()); baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), String.valueOf(1024 * 1024)); fs.setConf(baseJobConf); storage = new HoodieHadoopStorage(fs); @@ -100,8 +98,7 @@ public HoodieReaderContext getHoodieReaderContext(String tablePat JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class)); setupJobconf(jobConf, avroSchema); return new HiveHoodieReaderContext(readerCreator, - getStoredPartitionFieldNames(new JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema), - new ObjectInspectorCache(avroSchema, jobConf), storageConf, metaClient.getTableConfig()); + getStoredPartitionFieldNames(new JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema), storageConf, metaClient.getTableConfig()); } @Override @@ -116,24 +113,7 @@ public void assertRecordMatchesSchema(Schema schema, ArrayWritable record) { @Override public HoodieTestDataGenerator.SchemaEvolutionConfigs getSchemaEvolutionConfigs() { - HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new HoodieTestDataGenerator.SchemaEvolutionConfigs(); - configs.nestedSupport = false; - configs.arraySupport = false; - configs.mapSupport = false; - configs.addNewFieldSupport = false; - configs.intToLongSupport = false; - configs.intToFloatSupport = false; - configs.intToDoubleSupport = false; - configs.intToStringSupport = false; - configs.longToFloatSupport = false; - configs.longToDoubleSupport = false; - configs.longToStringSupport = false; - configs.floatToDoubleSupport = false; - configs.floatToStringSupport = false; - configs.doubleToStringSupport = false; - configs.stringToBytesSupport = false; - configs.bytesToStringSupport = false; - return configs; + return new HoodieTestDataGenerator.SchemaEvolutionConfigs(); } private void setupJobconf(JobConf jobConf, Schema schema) { diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java index d32da6179661a..70f021610f999 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java @@ -227,9 +227,15 @@ public static void assertArrayWritableMatchesSchema(Schema schema, Writable writ case UNION: if (schema.getTypes().size() == 2 && schema.getTypes().get(0).getType() == Schema.Type.NULL) { + if (writable == null || writable instanceof NullWritable) { + return; + } assertArrayWritableMatchesSchema(schema.getTypes().get(1), writable); } else if (schema.getTypes().size() == 2 && schema.getTypes().get(1).getType() == Schema.Type.NULL) { + if (writable == null || writable instanceof NullWritable) { + return; + } assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable); } else if (schema.getTypes().size() == 1) { assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaComparatorForRecordProjection.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaComparatorForRecordProjection.java new file mode 100644 index 0000000000000..73ea743c050a8 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaComparatorForRecordProjection.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.Schema; + +import java.util.List; + +import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema; + +public class AvroSchemaComparatorForRecordProjection extends AvroSchemaComparatorForSchemaEvolution { + + private static final AvroSchemaComparatorForRecordProjection INSTANCE = new AvroSchemaComparatorForRecordProjection(); + + public static boolean areSchemasProjectionEquivalent(Schema s1, Schema s2) { + return INSTANCE.schemaEqualsInternal(s1, s2); + } + + @Override + protected boolean schemaEqualsInternal(Schema s1, Schema s2) { + if (s1 == s2) { + return true; + } + if (s1 == null || s2 == null) { + return false; + } + return super.schemaEqualsInternal(resolveNullableSchema(s1), resolveNullableSchema(s2)); + } + + @Override + protected boolean validateRecord(Schema s1, Schema s2) { + return true; + } + + @Override + protected boolean validateField(Schema.Field f1, Schema.Field f2) { + return f1.name().equalsIgnoreCase(f2.name()); + } + + @Override + protected boolean enumSchemaEquals(Schema s1, Schema s2) { + List symbols1 = s1.getEnumSymbols(); + List symbols2 = s2.getEnumSymbols(); + if (symbols1.size() > symbols2.size()) { + return false; + } + + for (int i = 0; i < symbols1.size(); i++) { + if (!symbols1.get(i).equalsIgnoreCase(symbols2.get(i))) { + return false; + } + } + return true; + } + + @Override + protected boolean unionSchemaEquals(Schema s1, Schema s2) { + throw new UnsupportedOperationException("union not supported for projection equivalence"); + } + + @Override + protected boolean validateFixed(Schema s1, Schema s2) { + return s1.getFixedSize() == s2.getFixedSize(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaComparatorForSchemaEvolution.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaComparatorForSchemaEvolution.java index b30ad4f0f1b9d..3cd3a7025113b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaComparatorForSchemaEvolution.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaComparatorForSchemaEvolution.java @@ -23,9 +23,7 @@ import org.apache.avro.Schema; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -140,10 +138,16 @@ */ public class AvroSchemaComparatorForSchemaEvolution { - private AvroSchemaComparatorForSchemaEvolution() { + protected AvroSchemaComparatorForSchemaEvolution() { } + private static final AvroSchemaComparatorForSchemaEvolution VALIDATOR = new AvroSchemaComparatorForSchemaEvolution(); + public static boolean schemaEquals(Schema s1, Schema s2) { + return VALIDATOR.schemaEqualsInternal(s1, s2); + } + + protected boolean schemaEqualsInternal(Schema s1, Schema s2) { if (s1 == s2) { return true; } @@ -181,11 +185,19 @@ public static boolean schemaEquals(Schema s1, Schema s2) { } } - private static boolean recordSchemaEquals(Schema s1, Schema s2) { + protected boolean validateRecord(Schema s1, Schema s2) { if (s1.isError() != s2.isError()) { return false; } + return logicalTypeSchemaEquals(s1, s2); + } + + private boolean recordSchemaEquals(Schema s1, Schema s2) { + if (!validateRecord(s1, s2)) { + return false; + } + List fields1 = s1.getFields(); List fields2 = s2.getFields(); @@ -193,23 +205,15 @@ private static boolean recordSchemaEquals(Schema s1, Schema s2) { return false; } - Map fieldMap1 = fields1.stream() - .collect(Collectors.toMap(Schema.Field::name, Function.identity())); - - for (Schema.Field f2 : fields2) { - Schema.Field f1 = fieldMap1.get(f2.name()); - if (f1 == null) { - return false; - } - if (!fieldEquals(f1, f2)) { + for (int i = 0; i < fields1.size(); i++) { + if (!fieldEquals(fields1.get(i), fields2.get(i))) { return false; } } - - return logicalTypeSchemaEquals(s1, s2); + return true; } - private static boolean fieldEquals(Schema.Field f1, Schema.Field f2) { + protected boolean validateField(Schema.Field f1, Schema.Field f2) { if (!f1.name().equals(f2.name())) { return false; } @@ -223,10 +227,6 @@ private static boolean fieldEquals(Schema.Field f1, Schema.Field f2) { return false; } - if (!schemaEquals(f1.schema(), f2.schema())) { - return false; - } - // If both have default values, they must be equal if (f1.hasDefaultValue() && !f1.defaultVal().equals(f2.defaultVal())) { return false; @@ -235,7 +235,15 @@ private static boolean fieldEquals(Schema.Field f1, Schema.Field f2) { return true; } - private static boolean enumSchemaEquals(Schema s1, Schema s2) { + private boolean fieldEquals(Schema.Field f1, Schema.Field f2) { + if (!validateField(f1, f2)) { + return false; + } + + return schemaEqualsInternal(f1.schema(), f2.schema()); + } + + protected boolean enumSchemaEquals(Schema s1, Schema s2) { // Check name equality first if (!s1.getName().equals(s2.getName())) { return false; @@ -252,7 +260,7 @@ private static boolean enumSchemaEquals(Schema s1, Schema s2) { return symbols1.equals(symbols2); } - private static boolean unionSchemaEquals(Schema s1, Schema s2) { + protected boolean unionSchemaEquals(Schema s1, Schema s2) { List types1 = s1.getTypes(); List types2 = s2.getTypes(); @@ -268,17 +276,23 @@ private static boolean unionSchemaEquals(Schema s1, Schema s2) { return set1.equals(set2); } - private static boolean arraySchemaEquals(Schema s1, Schema s2) { - return schemaEquals(s1.getElementType(), s2.getElementType()); + private boolean arraySchemaEquals(Schema s1, Schema s2) { + return schemaEqualsInternal(s1.getElementType(), s2.getElementType()); + } + + private boolean mapSchemaEquals(Schema s1, Schema s2) { + return schemaEqualsInternal(s1.getValueType(), s2.getValueType()); } - private static boolean mapSchemaEquals(Schema s1, Schema s2) { - return schemaEquals(s1.getValueType(), s2.getValueType()); + protected boolean validateFixed(Schema s1, Schema s2) { + return s1.getName().equals(s2.getName()) && s1.getFixedSize() == s2.getFixedSize(); } - private static boolean fixedSchemaEquals(Schema s1, Schema s2) { - return s1.getName().equals(s2.getName()) && s1.getFixedSize() == s2.getFixedSize() - && logicalTypeSchemaEquals(s1, s2); + private boolean fixedSchemaEquals(Schema s1, Schema s2) { + if (!validateFixed(s1, s2)) { + return false; + } + return logicalTypeSchemaEquals(s1, s2); } private static boolean primitiveSchemaEquals(Schema s1, Schema s2) { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index bb153446d0e35..689c88780d9c3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -376,6 +376,95 @@ public static Schema createNewSchemaFromFieldsWithReference(Schema schema, List< return newSchema; } + /** + * If schemas are projection equivalent, then a record with schema1 does not need to be projected to schema2 + * because the projection will be the identity. + * + * Two schemas are considered projection equivalent if the field names and types are equivalent. + * The names of records, namespaces, or docs do not need to match. Nullability is ignored. + */ + public static boolean areSchemasProjectionEquivalent(Schema schema1, Schema schema2) { + return AvroSchemaComparatorForRecordProjection.areSchemasProjectionEquivalent(schema1, schema2); + } + + /** + * Prunes a data schema to match the structure of a required schema while preserving + * original metadata where possible. + * + *

This method recursively traverses both schemas and creates a new schema that: + *

    + *
  • Contains only fields present in the required schema
  • + *
  • Preserves field metadata (type, documentation, default values) from the data schema
  • + *
  • Optionally includes fields from the required schema that are marked for exclusion
  • + *
+ * + * @param dataSchema the source schema containing the original data structure and metadata + * @param requiredSchema the target schema that defines the desired structure and field requirements + * @param mandatoryFields a set of top level field names that should be included from the required schema + * even if they don't exist in the data schema. This allows for fields like cdc operation + * don't exist in the data schema + * + * @return a new pruned schema that matches the required schema structure while preserving + * data schema metadata where possible + */ + public static Schema pruneDataSchema(Schema dataSchema, Schema requiredSchema, Set mandatoryFields) { + Schema prunedDataSchema = pruneDataSchemaInternal(resolveNullableSchema(dataSchema), resolveNullableSchema(requiredSchema), mandatoryFields); + if (dataSchema.isNullable() && !prunedDataSchema.isNullable()) { + return createNullableSchema(prunedDataSchema); + } + return prunedDataSchema; + } + + private static Schema pruneDataSchemaInternal(Schema dataSchema, Schema requiredSchema, Set mandatoryFields) { + switch (requiredSchema.getType()) { + case RECORD: + if (dataSchema.getType() != Schema.Type.RECORD) { + throw new IllegalArgumentException("Data schema is not a record"); + } + List newFields = new ArrayList<>(); + for (Schema.Field requiredSchemaField : requiredSchema.getFields()) { + Schema.Field dataSchemaField = dataSchema.getField(requiredSchemaField.name()); + if (dataSchemaField != null) { + Schema.Field newField = new Schema.Field( + dataSchemaField.name(), + pruneDataSchema(dataSchemaField.schema(), requiredSchemaField.schema(), Collections.emptySet()), + dataSchemaField.doc(), + dataSchemaField.defaultVal() + ); + newFields.add(newField); + } else if (mandatoryFields.contains(requiredSchemaField.name())) { + newFields.add(new Schema.Field( + requiredSchemaField.name(), + requiredSchemaField.schema(), + requiredSchemaField.doc(), + requiredSchemaField.defaultVal() + )); + } + } + Schema newRecord = Schema.createRecord(dataSchema.getName(), dataSchema.getDoc(), dataSchema.getNamespace(), false); + newRecord.setFields(newFields); + return newRecord; + + case ARRAY: + if (dataSchema.getType() != Schema.Type.ARRAY) { + throw new IllegalArgumentException("Data schema is not an array"); + } + return Schema.createArray(pruneDataSchema(dataSchema.getElementType(), requiredSchema.getElementType(), Collections.emptySet())); + + case MAP: + if (dataSchema.getType() != Schema.Type.MAP) { + throw new IllegalArgumentException("Data schema is not a map"); + } + return Schema.createMap(pruneDataSchema(dataSchema.getValueType(), requiredSchema.getValueType(), Collections.emptySet())); + + case UNION: + throw new IllegalArgumentException("Data schema is a union"); + + default: + return dataSchema; + } + } + /** * Passed in {@code Union} schema and will try to resolve the field with the {@code fieldSchemaFullName} * w/in the union returning its corresponding schema diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 2019f5bcf257a..0de9d4dd62f2d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1084,7 +1084,7 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, GenericData.Record newRecord = new GenericData.Record(newSchema); // if no renaming, skip building the full name string. boolean noFieldsRenaming = renameCols.isEmpty(); - String namePrefix = createNamePredix(noFieldsRenaming, fieldNames); + String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames); for (int i = 0; i < newSchema.getFields().size(); i++) { Schema.Field newField = newSchema.getFields().get(i); String newFieldName = newField.name(); @@ -1146,12 +1146,11 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, } } - @VisibleForTesting - public static String createNamePredix(boolean noFieldsRenaming, Deque fieldNames) { - return noFieldsRenaming ? null : fieldNames.isEmpty() ? null : createFullName(fieldNames); + public static String createNamePrefix(boolean noFieldsRenaming, Deque fieldNames) { + return noFieldsRenaming || fieldNames.isEmpty() ? null : createFullName(fieldNames); } - private static String getOldFieldNameWithRenaming(String namePrefix, String newFieldName, Map renameCols) { + public static String getOldFieldNameWithRenaming(String namePrefix, String newFieldName, Map renameCols) { String renamed = renameCols.get(compositeName(namePrefix, newFieldName)); return renamed == null ? newFieldName : renamed; } @@ -1411,10 +1410,8 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ case RECORD: for (Schema.Field field : readerSchema.getFields()) { Schema.Field writerField = writerSchema.getField(field.name()); - if (writerField != null) { - if (recordNeedsRewriteForExtendedAvroTypePromotion(writerField.schema(), field.schema())) { - return true; - } + if (writerField == null || recordNeedsRewriteForExtendedAvroTypePromotion(writerField.schema(), field.schema())) { + return true; } } return false; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java index 6e4b81bce3b9d..a05fa38b29657 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.read; import org.apache.hudi.avro.AvroSchemaCache; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; @@ -114,7 +115,7 @@ public Option getInternalSchemaOpt() { } public Option> getOutputConverter() { - if (!requestedSchema.equals(requiredSchema)) { + if (!AvroSchemaUtils.areSchemasProjectionEquivalent(requiredSchema, requestedSchema)) { return Option.of(readerContext.projectRecord(requiredSchema, requestedSchema)); } return Option.empty(); diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java index 27d29f83f05b7..a4480225cab06 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java @@ -23,17 +23,24 @@ import org.apache.hudi.exception.SchemaBackwardsCompatibilityException; import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.AssertionsKt.assertNotNull; public class TestAvroSchemaUtils { @@ -419,4 +426,351 @@ public void testFindNestedFieldType() { assertThrows(HoodieAvroSchemaException.class, () -> AvroSchemaUtils.findNestedFieldType(sourceSchema, "nested_record.bool")); assertThrows(HoodieAvroSchemaException.class, () -> AvroSchemaUtils.findNestedFieldType(sourceSchema, "non_present_field.also_not_present")); } + + private static Schema parse(String json) { + return new Schema.Parser().parse(json); + } + + @Test + void testAreSchemasProjectionEquivalentRecordSchemas() { + Schema s1 = parse("{\"type\":\"record\",\"name\":\"R\",\"fields\":[{\"name\":\"f1\",\"type\":\"int\"}]}"); + Schema s2 = parse("{\"type\":\"record\",\"name\":\"R2\",\"fields\":[{\"name\":\"f1\",\"type\":\"int\"}]}"); + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentDifferentFieldCountInRecords() { + Schema s1 = parse("{\"type\":\"record\",\"name\":\"R1\",\"fields\":[{\"name\":\"a\",\"type\":\"int\"}]}"); + Schema s2 = parse("{\"type\":\"record\",\"name\":\"R2\",\"fields\":[]}"); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentNestedRecordSchemas() { + Schema s1 = parse("{\"type\":\"record\",\"name\":\"Outer1\",\"fields\":[{\"name\":\"inner\"," + + "\"type\":{\"type\":\"record\",\"name\":\"Inner1\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}]}"); + Schema s2 = parse("{\"type\":\"record\",\"name\":\"Outer2\",\"fields\":[{\"name\":\"inner\"," + + "\"type\":{\"type\":\"record\",\"name\":\"Inner2\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}]}"); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentArraySchemas() { + Schema s1 = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema s2 = Schema.createArray(Schema.create(Schema.Type.STRING)); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentDifferentElementTypeInArray() { + Schema s1 = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema s2 = Schema.createArray(Schema.create(Schema.Type.INT)); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentMapSchemas() { + Schema s1 = Schema.createMap(Schema.create(Schema.Type.LONG)); + Schema s2 = Schema.createMap(Schema.create(Schema.Type.LONG)); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentDifferentMapValueTypes() { + Schema s1 = Schema.createMap(Schema.create(Schema.Type.LONG)); + Schema s2 = Schema.createMap(Schema.create(Schema.Type.STRING)); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentNullableSchemaComparison() { + Schema s1 = AvroSchemaUtils.createNullableSchema(Schema.create(Schema.Type.INT)); + Schema s2 = Schema.create(Schema.Type.INT); + s2.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentListVsString() { + Schema stringSchema = Schema.create(Schema.Type.STRING); + Schema listSchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(listSchema, stringSchema)); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(stringSchema, listSchema)); + } + + @Test + void testAreSchemasProjectionEquivalentMapVsString() { + Schema stringSchema = Schema.create(Schema.Type.STRING); + Schema mapSchema = Schema.createMap(Schema.create(Schema.Type.STRING)); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(mapSchema, stringSchema)); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(stringSchema, mapSchema)); + } + + @Test + void testAreSchemasProjectionEquivalentEqualFixedSchemas() { + Schema s1 = Schema.createFixed("F", null, null, 16); + Schema s2 = Schema.createFixed("F", null, null, 16); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentDifferentFixedSize() { + Schema s1 = Schema.createFixed("F", null, null, 8); + Schema s2 = Schema.createFixed("F", null, null, 4); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentEnums() { + Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("A", "B", "C")); + Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("A", "B", "C")); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentDifferentEnumSymbols() { + Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("X", "Y")); + Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("A", "B")); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentEnumSymbolSubset() { + Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("A", "B")); + Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("A", "B", "C")); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s2, s1)); + } + + @Test + void testAreSchemasProjectionEquivalentEqualDecimalLogicalTypes() { + Schema s1 = Schema.create(Schema.Type.BYTES); + LogicalTypes.decimal(12, 2).addToSchema(s1); + + Schema s2 = Schema.create(Schema.Type.BYTES); + LogicalTypes.decimal(12, 2).addToSchema(s2); + s1.addProp("prop1", "value1"); // prevent Objects.equals from returning true + + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentDifferentPrecision() { + Schema s1 = Schema.create(Schema.Type.BYTES); + LogicalTypes.decimal(12, 2).addToSchema(s1); + + Schema s2 = Schema.create(Schema.Type.BYTES); + LogicalTypes.decimal(13, 2).addToSchema(s2); + + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentLogicalVsNoLogicalType() { + Schema s1 = Schema.create(Schema.Type.BYTES); + LogicalTypes.decimal(10, 2).addToSchema(s1); + + Schema s2 = Schema.create(Schema.Type.BYTES); + + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2)); + } + + @Test + void testAreSchemasProjectionEquivalentSameReferenceSchema() { + Schema s = Schema.create(Schema.Type.STRING); + assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s, s)); + } + + @Test + void testAreSchemasProjectionEquivalentNullSchemaComparison() { + Schema s = Schema.create(Schema.Type.STRING); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(null, s)); + assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s, null)); + } + + @Test + void testPruneRecordFields() { + String dataSchemaStr = "{ \"type\": \"record\", \"name\": \"Person\", \"fields\": [" + + "{ \"name\": \"name\", \"type\": \"string\" }," + + "{ \"name\": \"age\", \"type\": \"int\" }," + + "{ \"name\": \"email\", \"type\": [\"null\", \"string\"], \"default\": null }" + + "]}"; + + String requiredSchemaStr = "{ \"type\": \"record\", \"name\": \"Person\", \"fields\": [" + + "{ \"name\": \"name\", \"type\": \"string\" }" + + "]}"; + + Schema dataSchema = parse(dataSchemaStr); + Schema requiredSchema = parse(requiredSchemaStr); + + Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema, requiredSchema, Collections.emptySet()); + + assertEquals(1, pruned.getFields().size()); + assertEquals("name", pruned.getFields().get(0).name()); + } + + @Test + void testPruningPreserveNullable() { + String dataSchemaStr = "{" + + "\"type\": \"record\"," + + "\"name\": \"Container\"," + + "\"fields\": [" + + " {" + + " \"name\": \"foo\"," + + " \"type\": [\"null\", {" + + " \"type\": \"record\"," + + " \"name\": \"Foo\"," + + " \"fields\": [" + + " {\"name\": \"field1\", \"type\": \"string\"}," + + " {\"name\": \"field2\", \"type\": \"int\"}" + + " ]" + + " }]," + + " \"default\": null" + + " }" + + "]" + + "}"; + + String requiredFooStr = "{" + + "\"type\": \"record\"," + + "\"name\": \"Foo\"," + + "\"fields\": [" + + " {\"name\": \"field1\", \"type\": \"string\"}" + + "]" + + "}"; + + Schema dataSchema = parse(dataSchemaStr); + Schema requiredSchema = parse(requiredFooStr); + + Schema fooFieldSchema = dataSchema.getField("foo").schema(); + Schema pruned = AvroSchemaUtils.pruneDataSchema(fooFieldSchema, requiredSchema, Collections.emptySet()); + + assertEquals(Schema.Type.UNION, pruned.getType()); + + Schema prunedRecord = pruned.getTypes().stream() + .filter(s -> s.getType() == Schema.Type.RECORD) + .collect(Collectors.toList()).get(0); + assertNotNull(prunedRecord.getField("field1")); + assertNull(prunedRecord.getField("field2")); + } + + @Test + void testArrayElementPruning() { + String dataSchemaStr = "{ \"type\": \"array\", \"items\": { \"type\": \"record\", \"name\": \"Item\", \"fields\": [" + + "{\"name\": \"a\", \"type\": \"int\"}, {\"name\": \"b\", \"type\": \"string\"}" + + "]}}"; + + String requiredSchemaStr = "{ \"type\": \"array\", \"items\": { \"type\": \"record\", \"name\": \"Item\", \"fields\": [" + + "{\"name\": \"b\", \"type\": \"string\"}" + + "]}}"; + + Schema dataSchema = parse(dataSchemaStr); + Schema requiredSchema = parse(requiredSchemaStr); + + Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema, requiredSchema, Collections.emptySet()); + Schema itemSchema = pruned.getElementType(); + + assertEquals(1, itemSchema.getFields().size()); + assertEquals("b", itemSchema.getFields().get(0).name()); + } + + @Test + void testMapValuePruning() { + String dataSchemaStr = "{ \"type\": \"map\", \"values\": { \"type\": \"record\", \"name\": \"Entry\", \"fields\": [" + + "{\"name\": \"x\", \"type\": \"int\"}, {\"name\": \"y\", \"type\": \"string\"}" + + "]}}"; + + String requiredSchemaStr = "{ \"type\": \"map\", \"values\": { \"type\": \"record\", \"name\": \"Entry\", \"fields\": [" + + "{\"name\": \"y\", \"type\": \"string\"}" + + "]}}"; + + Schema dataSchema = parse(dataSchemaStr); + Schema requiredSchema = parse(requiredSchemaStr); + + Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema, requiredSchema, Collections.emptySet()); + Schema valueSchema = pruned.getValueType(); + + assertEquals(1, valueSchema.getFields().size()); + assertEquals("y", valueSchema.getFields().get(0).name()); + } + + @Test + void testPruningExcludedFieldIsPreservedIfMissingInDataSchema() { + String dataSchemaStr = "{ \"type\": \"record\", \"name\": \"Rec\", \"fields\": [" + + "{\"name\": \"existing\", \"type\": \"int\"}" + + "]}"; + + String requiredSchemaStr = "{ \"type\": \"record\", \"name\": \"Rec\", \"fields\": [" + + "{\"name\": \"existing\", \"type\": \"int\"}," + + "{\"name\": \"missing\", \"type\": \"string\", \"default\": \"default\"}" + + "]}"; + + Schema dataSchema = parse(dataSchemaStr); + Schema requiredSchema = parse(requiredSchemaStr); + + Set mandatoryFields = Collections.singleton("missing"); + + Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema, requiredSchema, mandatoryFields); + + assertEquals(2, pruned.getFields().size()); + assertNotNull(pruned.getField("missing")); + assertEquals("string", pruned.getField("missing").schema().getType().getName()); + } + + @Test + void testPruningMandatoryFieldsOnlyApplyToTopLevel() { + String dataSchemaStr = "{ \"type\": \"record\", \"name\": \"Rec\", \"fields\": [" + + "{\"name\": \"existing\", \"type\": \"int\"}," + + "{\"name\": \"nestedRecord\", \"type\": {" + + " \"type\": \"record\", \"name\": \"NestedRec\", \"fields\": [" + + " {\"name\": \"nestedField\", \"type\": \"string\"}" + + " ]" + + "}}" + + "]}"; + + String requiredSchemaStr = "{ \"type\": \"record\", \"name\": \"Rec\", \"fields\": [" + + "{\"name\": \"existing\", \"type\": \"int\"}," + + "{\"name\": \"topLevelMissing\", \"type\": \"string\", \"default\": \"default\"}," + + "{\"name\": \"nestedRecord\", \"type\": {" + + " \"type\": \"record\", \"name\": \"NestedRec\", \"fields\": [" + + " {\"name\": \"nestedField\", \"type\": \"string\"}," + + " {\"name\": \"nestedMissing\", \"type\": \"int\", \"default\": 0}" + + " ]" + + "}}" + + "]}"; + + Schema dataSchema = parse(dataSchemaStr); + Schema requiredSchema = parse(requiredSchemaStr); + + // Both "topLevelMissing" and "nestedMissing" are in mandatory fields + // but only "topLevelMissing" should be preserved since mandatory fields + // only apply to top-level fields + Set mandatoryFields = new HashSet<>(Arrays.asList("topLevelMissing", "nestedMissing")); + + Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema, requiredSchema, mandatoryFields); + + // Should have 3 top-level fields: existing, topLevelMissing, nestedRecord + assertEquals(3, pruned.getFields().size()); + + // Top-level mandatory field should be preserved even though missing from data schema + assertNotNull(pruned.getField("topLevelMissing")); + assertEquals("string", pruned.getField("topLevelMissing").schema().getType().getName()); + + // Nested record should exist + assertNotNull(pruned.getField("nestedRecord")); + Schema nestedSchema = pruned.getField("nestedRecord").schema(); + + // Nested record should only have 1 field (nestedField) - nestedMissing should NOT be preserved + // because mandatory fields only apply to top-level + assertEquals(1, nestedSchema.getFields().size()); + assertNotNull(nestedSchema.getField("nestedField")); + assertNull(nestedSchema.getField("nestedMissing")); // This should be null - not preserved + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 04104040665f4..f7823cef0cdd7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -886,10 +886,10 @@ void testCreateFullName() { @Test public void testCreateNamePrefix() { - assertNull(HoodieAvroUtils.createNamePredix(true, new ArrayDeque<>(Collections.singletonList("field1")))); - assertEquals("field1", HoodieAvroUtils.createNamePredix(false, new ArrayDeque<>(Collections.singletonList("field1")))); - assertNull(HoodieAvroUtils.createNamePredix(false, new ArrayDeque<>())); - assertEquals("parent.child", HoodieAvroUtils.createNamePredix(false, new ArrayDeque<>(Arrays.asList("child", "parent")))); + assertNull(HoodieAvroUtils.createNamePrefix(true, new ArrayDeque<>(Collections.singletonList("field1")))); + assertEquals("field1", HoodieAvroUtils.createNamePrefix(false, new ArrayDeque<>(Collections.singletonList("field1")))); + assertNull(HoodieAvroUtils.createNamePrefix(false, new ArrayDeque<>())); + assertEquals("parent.child", HoodieAvroUtils.createNamePrefix(false, new ArrayDeque<>(Arrays.asList("child", "parent")))); } @ParameterizedTest diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java index a8d3e3cfab315..df9e7c4793a5a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java @@ -328,15 +328,30 @@ public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema(HoodieFileFormat } } + private static Stream testArgsForDifferentBaseAndLogFormats() { + boolean supportsORC = supportedFileFormats.contains(HoodieFileFormat.ORC); + List args = new ArrayList<>(); + + if (supportsORC) { + args.add(arguments(HoodieFileFormat.ORC, "avro")); + } + + args.add(arguments(HoodieFileFormat.PARQUET, "avro")); + args.add(arguments(HoodieFileFormat.PARQUET, "parquet")); + + return args.stream(); + } + /** * Write a base file with schema A, then write a log file with schema A, then write another base file with schema B. */ @ParameterizedTest - @MethodSource("supportedBaseFileFormatArgs") - public void testSchemaEvolutionWhenBaseFileHasDifferentSchemaThanLogFiles(HoodieFileFormat fileFormat) throws Exception { + @MethodSource("testArgsForDifferentBaseAndLogFormats") + public void testSchemaEvolutionWhenBaseFileHasDifferentSchemaThanLogFiles(HoodieFileFormat fileFormat, String logFileFormat) throws Exception { Map writeConfigs = new HashMap<>( getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true)); writeConfigs.put(HoodieTableConfig.BASE_FILE_FORMAT.key(), fileFormat.name()); + writeConfigs.put(HoodieTableConfig.LOG_FILE_FORMAT.key(), logFileFormat); HoodieTestDataGenerator.SchemaEvolutionConfigs schemaEvolutionConfigs = getSchemaEvolutionConfigs(); if (fileFormat == HoodieFileFormat.ORC) { // ORC can support reading float as string, but it converts float to double to string causing differences in precision diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java index 5022f0cef926a..0c4199e2ad664 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.serde2.avro; +import org.apache.hudi.avro.AvroSchemaUtils; + import static org.apache.avro.Schema.Type.BOOLEAN; import static org.apache.avro.Schema.Type.BYTES; import static org.apache.avro.Schema.Type.DOUBLE; @@ -235,9 +237,8 @@ private static TypeInfo generateTypeInfoWorker(Schema schema, Set seenSchemas) throws AvroSerdeException { // Avro requires NULLable types to be defined as unions of some type T // and NULL. This is annoying and we're going to hide it from the user. - if (AvroSerdeUtils.isNullableType(schema)) { - return generateTypeInfo( - AvroSerdeUtils.getOtherTypeFromNullableType(schema), seenSchemas); + if (AvroSchemaUtils.isNullable(schema)) { + return generateTypeInfo(AvroSchemaUtils.resolveNullableSchema(schema), seenSchemas); } Schema.Type type = schema.getType(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java index b401069055a5c..d77973111ff30 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java @@ -19,10 +19,12 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.HoodieRecordUtils; @@ -30,8 +32,9 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils; -import org.apache.hudi.hadoop.utils.ObjectInspectorCache; +import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -44,6 +47,8 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; +import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; @@ -52,9 +57,11 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; +import org.apache.parquet.avro.AvroSchemaConverter; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -71,7 +78,6 @@ */ public class HiveHoodieReaderContext extends HoodieReaderContext { protected final HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator; - protected final Map columnTypeMap; private RecordReader firstRecordReader = null; private final List partitionCols; @@ -79,27 +85,28 @@ public class HiveHoodieReaderContext extends HoodieReaderContext protected HiveHoodieReaderContext(HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator, List partitionCols, - ObjectInspectorCache objectInspectorCache, StorageConfiguration storageConfiguration, HoodieTableConfig tableConfig) { - super(storageConfiguration, tableConfig, Option.empty(), Option.empty(), new HiveRecordContext(tableConfig, storageConfiguration, objectInspectorCache)); + super(storageConfiguration, tableConfig, Option.empty(), Option.empty(), new HiveRecordContext(tableConfig)); + if (storageConfiguration.getString(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS).isEmpty()) { + // Overriding default treatment of repeated groups in Parquet + storageConfiguration.set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false"); + } this.readerCreator = readerCreator; this.partitionCols = partitionCols; this.partitionColSet = new HashSet<>(this.partitionCols); - this.columnTypeMap = objectInspectorCache.getColumnTypeMap(); } private void setSchemas(JobConf jobConf, Schema dataSchema, Schema requiredSchema) { List dataColumnNameList = dataSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toList()); - List dataColumnTypeList = dataColumnNameList.stream().map(fieldName -> { - TypeInfo type = columnTypeMap.get(fieldName); - if (type == null) { - throw new IllegalArgumentException("Field: " + fieldName + ", does not have a defined type"); - } - return type; - }).collect(Collectors.toList()); jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", dataColumnNameList)); - jobConf.set(serdeConstants.LIST_COLUMN_TYPES, dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(","))); + List columnTypes; + try { + columnTypes = HiveTypeUtils.generateColumnTypes(dataSchema); + } catch (AvroSerdeException e) { + throw new HoodieAvroSchemaException(String.format("Failed to generate hive column types from avro schema: %s, due to %s", dataSchema, e)); + } + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypes.stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","))); // don't replace `f -> f.name()` with lambda reference String readColNames = requiredSchema.getFields().stream().map(f -> f.name()).collect(Collectors.joining(",")); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNames); @@ -121,6 +128,12 @@ public ClosableIterator getFileRecordIterator( private ClosableIterator getFileRecordIterator(StoragePath filePath, String[] hosts, long start, long length, Schema dataSchema, Schema requiredSchema, HoodieStorage storage) throws IOException { + // mdt file schema irregular and does not work with this logic. Also, log file evolution is handled inside the log block + boolean isParquetOrOrc = filePath.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension()) + || filePath.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension()); + Schema avroFileSchema = isParquetOrOrc ? HoodieIOFactory.getIOFactory(storage) + .getFileFormatUtils(filePath).readAvroSchema(storage, filePath) : dataSchema; + Schema actualRequiredSchema = isParquetOrOrc ? AvroSchemaUtils.pruneDataSchema(avroFileSchema, requiredSchema, Collections.emptySet()) : requiredSchema; JobConf jobConfCopy = new JobConf(storage.getConf().unwrapAs(Configuration.class)); if (getNeedsBootstrapMerge()) { // Hive PPD works at row-group level and only enabled when hive.optimize.index.filter=true; @@ -132,17 +145,17 @@ private ClosableIterator getFileRecordIterator(StoragePath filePa jobConfCopy.unset(ConvertAstToSearchArg.SARG_PUSHDOWN); } //move the partition cols to the end, because in some cases it has issues if we don't do that - Schema modifiedDataSchema = HoodieAvroUtils.generateProjectionSchema(dataSchema, Stream.concat(dataSchema.getFields().stream() + Schema modifiedDataSchema = HoodieAvroUtils.generateProjectionSchema(avroFileSchema, Stream.concat(avroFileSchema.getFields().stream() .map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColSet.contains(n)), - partitionCols.stream().filter(c -> dataSchema.getField(c) != null)).collect(Collectors.toList())); - setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema); + partitionCols.stream().filter(c -> avroFileSchema.getField(c) != null)).collect(Collectors.toList())); + setSchemas(jobConfCopy, modifiedDataSchema, actualRequiredSchema); InputSplit inputSplit = new FileSplit(new Path(filePath.toString()), start, length, hosts); RecordReader recordReader = readerCreator.getRecordReader(inputSplit, jobConfCopy); if (firstRecordReader == null) { firstRecordReader = recordReader; } ClosableIterator recordIterator = new RecordReaderValueIterator<>(recordReader); - if (modifiedDataSchema.equals(requiredSchema)) { + if (AvroSchemaUtils.areSchemasProjectionEquivalent(modifiedDataSchema, requiredSchema)) { return recordIterator; } // record reader puts the required columns in the positions of the data schema and nulls the rest of the columns @@ -207,7 +220,7 @@ public ArrayWritable next() { Writable[] skeletonWritable = skeletonFileIterator.next().get(); Writable[] dataWritable = dataFileIterator.next().get(); for (int i = 0; i < partitionFieldPositions.length; i++) { - if (dataWritable[partitionFieldPositions[i]] == null) { + if (dataWritable[partitionFieldPositions[i]] == null || dataWritable[partitionFieldPositions[i]] instanceof NullWritable) { dataWritable[partitionFieldPositions[i]] = convertedPartitionValues[i]; } } @@ -228,14 +241,7 @@ public void close() { @Override public UnaryOperator projectRecord(Schema from, Schema to, Map renamedColumns) { - if (!renamedColumns.isEmpty()) { - throw new IllegalStateException("Schema evolution is not supported in the filegroup reader for Hive currently"); - } - return HoodieArrayWritableAvroUtils.projectRecord(from, to); - } - - public UnaryOperator reverseProjectRecord(Schema from, Schema to) { - return HoodieArrayWritableAvroUtils.reverseProject(from, to); + return record -> HoodieArrayWritableAvroUtils.rewriteRecordWithNewSchema(record, from, to, renamedColumns); } public long getPos() throws IOException { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java index a022fb28bf712..2a1d73b0b9c21 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java @@ -26,11 +26,9 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.read.BufferedRecord; import org.apache.hudi.common.util.OrderingValues; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.hadoop.utils.HiveAvroSerializer; import org.apache.hudi.hadoop.utils.HiveJavaTypeConverter; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; -import org.apache.hudi.hadoop.utils.ObjectInspectorCache; -import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -46,23 +44,23 @@ import org.apache.hadoop.io.WritableComparable; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class HiveRecordContext extends RecordContext { - private final ObjectInspectorCache objectInspectorCache; + private final Map serializerCache = new ConcurrentHashMap<>(); - public HiveRecordContext(HoodieTableConfig tableConfig, StorageConfiguration storageConf, ObjectInspectorCache objectInspectorCache) { - super(tableConfig, new HiveJavaTypeConverter()); - this.objectInspectorCache = objectInspectorCache; + private HiveAvroSerializer getHiveAvroSerializer(Schema schema) { + return serializerCache.computeIfAbsent(schema, HiveAvroSerializer::new); } - public static Object getFieldValueFromArrayWritable(ArrayWritable record, Schema schema, String fieldName, ObjectInspectorCache objectInspectorCache) { - return StringUtils.isNullOrEmpty(fieldName) ? null : objectInspectorCache.getValue(record, schema, fieldName); + public HiveRecordContext(HoodieTableConfig tableConfig) { + super(tableConfig, new HiveJavaTypeConverter()); } @Override public Object getValue(ArrayWritable record, Schema schema, String fieldName) { - return getFieldValueFromArrayWritable(record, schema, fieldName, objectInspectorCache); + return getHiveAvroSerializer(schema).getValue(record, fieldName); } @Override @@ -82,7 +80,7 @@ public HoodieRecord constructHoodieRecord(BufferedRecord f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)), partitionColumns.stream()).collect(Collectors.toList())); - this.reverseProjection = readerContext.reverseProjectRecord(requestedSchema, outputSchema); + this.reverseProjection = HoodieArrayWritableAvroUtils.getReverseProjection(requestedSchema, outputSchema); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java index 71f9db9c1eeec..1f6b6bc3a9541 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java @@ -27,8 +27,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.OrderingValues; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils; -import org.apache.hudi.hadoop.utils.ObjectInspectorCache; +import org.apache.hudi.hadoop.utils.HiveAvroSerializer; import org.apache.hudi.keygen.BaseKeyGenerator; import com.esotericsoftware.kryo.Kryo; @@ -37,7 +36,6 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.BooleanWritable; @@ -60,42 +58,37 @@ public class HoodieHiveRecord extends HoodieRecord { private boolean copy; - private final ArrayWritableObjectInspector objectInspector; - - private final ObjectInspectorCache objectInspectorCache; + private final HiveAvroSerializer avroSerializer; protected Schema schema; - public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, ObjectInspectorCache objectInspectorCache) { + public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, HiveAvroSerializer avroSerializer) { super(key, data); - this.objectInspector = objectInspectorCache.getObjectInspector(schema); - this.objectInspectorCache = objectInspectorCache; + this.avroSerializer = avroSerializer; this.schema = schema; this.copy = false; isDelete = data == null; } - public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, ObjectInspectorCache objectInspectorCache, HoodieOperation hoodieOperation, boolean isDelete) { + public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation, boolean isDelete) { super(key, data, hoodieOperation, isDelete, Option.empty()); - this.objectInspector = objectInspectorCache.getObjectInspector(schema); - this.objectInspectorCache = objectInspectorCache; + this.avroSerializer = avroSerializer; this.schema = schema; this.copy = false; } private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, HoodieOperation operation, boolean isCopy, - ArrayWritableObjectInspector objectInspector, ObjectInspectorCache objectInspectorCache) { + HiveAvroSerializer avroSerializer) { super(key, data, operation, Option.empty()); this.schema = schema; this.copy = isCopy; isDelete = data == null; - this.objectInspector = objectInspector; - this.objectInspectorCache = objectInspectorCache; + this.avroSerializer = avroSerializer; } @Override public HoodieRecord newInstance() { - return new HoodieHiveRecord(this.key, this.data, this.schema, this.operation, this.copy, this.objectInspector, this.objectInspectorCache); + return new HoodieHiveRecord(this.key, this.data, this.schema, this.operation, this.copy, this.avroSerializer); } @Override @@ -174,7 +167,7 @@ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean c @Override public Object getColumnValueAsJava(Schema recordSchema, String column, Properties props) { - return HiveRecordContext.getFieldValueFromArrayWritable(data, schema, column, objectInspectorCache); + return avroSerializer.getValueAsJava(data, column); } @Override @@ -252,7 +245,7 @@ public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties props) } private Object getValue(String name) { - return HoodieArrayWritableAvroUtils.getWritableValue(data, objectInspector, name); + return avroSerializer.getValue(data, name); } protected Schema getSchema() { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java index 0c3362ba98157..4278439fbb14e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java @@ -18,7 +18,10 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.exception.HoodieException; import org.apache.avro.JsonProperties; @@ -32,7 +35,10 @@ import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils; import org.apache.hadoop.hive.serde2.avro.InstanceCache; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; @@ -46,6 +52,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.slf4j.Logger; @@ -70,21 +77,170 @@ public class HiveAvroSerializer { private final List columnNames; private final List columnTypes; - private final ObjectInspector objectInspector; + private final ArrayWritableObjectInspector objectInspector; + private final Schema recordSchema; private static final Logger LOG = LoggerFactory.getLogger(HiveAvroSerializer.class); - public HiveAvroSerializer(ObjectInspector objectInspector, List columnNames, List columnTypes) { + public HiveAvroSerializer(Schema schema) { + schema = AvroSchemaUtils.resolveNullableSchema(schema); + if (schema.getType() != Schema.Type.RECORD) { + throw new IllegalArgumentException("Expected record schema, but got: " + schema); + } + this.recordSchema = schema; + this.columnNames = schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList()); + try { + this.columnTypes = HiveTypeUtils.generateColumnTypes(schema); + } catch (AvroSerdeException e) { + throw new HoodieAvroSchemaException(String.format("Failed to generate hive column types from avro schema: %s, due to %s", schema, e)); + } + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(this.columnNames, this.columnTypes); + this.objectInspector = new ArrayWritableObjectInspector(rowTypeInfo); + } + + public HiveAvroSerializer(ArrayWritableObjectInspector objectInspector, List columnNames, List columnTypes) { this.columnNames = columnNames; this.columnTypes = columnTypes; this.objectInspector = objectInspector; + this.recordSchema = null; + } + + public Object getValue(ArrayWritable record, String fieldName) { + if (StringUtils.isNullOrEmpty(fieldName)) { + return null; + } + Object currentObject = record; + ObjectInspector currentObjectInspector = this.objectInspector; + String[] path = fieldName.split("\\."); + + for (int i = 0; i < path.length; i++) { + String field = path[i]; + + while (currentObjectInspector.getCategory() == ObjectInspector.Category.UNION) { + UnionObjectInspector unionOI = (UnionObjectInspector) currentObjectInspector; + byte tag = unionOI.getTag(currentObject); + currentObject = unionOI.getField(currentObject); + currentObjectInspector = unionOI.getObjectInspectors().get(tag); + } + + if (!(currentObjectInspector instanceof ArrayWritableObjectInspector)) { + throw new HoodieException("Expected struct (ArrayWritableObjectInspector) to access field '" + + field + "', but found: " + currentObjectInspector.getClass().getSimpleName()); + } + + ArrayWritableObjectInspector structOI = (ArrayWritableObjectInspector) currentObjectInspector; + StructField structFieldRef = structOI.getStructFieldRef(field); + + if (structFieldRef == null) { + throw new HoodieException("Field '" + field + "' not found in current object inspector."); + } + + Object fieldData = structOI.getStructFieldData(currentObject, structFieldRef); + + if (i == path.length - 1) { + // Final field — return value as-is (possibly Writable or Java object) + return fieldData; + } + + currentObject = fieldData; + currentObjectInspector = structFieldRef.getFieldObjectInspector(); + } + + return null; + } + + public Object getValueAsJava(ArrayWritable record, String fieldName) { + String[] path = fieldName.split("\\."); + + FieldContext context = extractFieldFromRecord(record, + this.objectInspector, + this.columnTypes, + this.recordSchema, + path[0]); + + for (int i = 1; i < path.length; i++) { + if (!(context.object instanceof ArrayWritable)) { + throw new HoodieException("Expected ArrayWritable while resolving '" + path[i] + + "', but got " + context.object.getClass().getSimpleName()); + } + + if (context.objectInspector.getCategory() != ObjectInspector.Category.STRUCT) { + throw new HoodieException("Expected StructObjectInspector to access field '" + path[i] + + "', but found: " + context.objectInspector.getClass().getSimpleName()); + } + + if (!(context.typeInfo instanceof StructTypeInfo)) { + throw new HoodieException("Expected StructTypeInfo while resolving '" + path[i] + + "', but got " + context.typeInfo.getTypeName()); + } + + if (!(context.schema.getType() == Schema.Type.RECORD)) { + throw new HoodieException("Expected RecordSchema while resolving '" + path[i] + + "', but got " + context.schema.getType()); + } + + context = extractFieldFromRecord((ArrayWritable) context.object, (StructObjectInspector) context.objectInspector, + ((StructTypeInfo) context.typeInfo).getAllStructFieldTypeInfos(), context.schema, path[i]); + } + + return serialize(context.typeInfo, context.objectInspector, context.object, context.schema); + } + + private FieldContext extractFieldFromRecord(ArrayWritable record, StructObjectInspector structObjectInspector, + List fieldTypes, Schema schema, String fieldName) { + Schema.Field schemaField = schema.getField(fieldName); + if (schemaField == null) { + throw new HoodieException("Field '" + fieldName + "' not found in schema: " + schema); + } + + int fieldIdx = schemaField.pos(); + TypeInfo fieldTypeInfo = fieldTypes.get(fieldIdx); + Schema fieldSchema = resolveNullableSchema(schemaField.schema()); + + StructField structField = structObjectInspector.getStructFieldRef(fieldName); + if (structField == null) { + throw new HoodieException("Field '" + fieldName + "' not found in ObjectInspector"); + } + + Object fieldData = structObjectInspector.getStructFieldData(record, structField); + ObjectInspector fieldObjectInspector = structField.getFieldObjectInspector(); + + if (fieldObjectInspector.getCategory() == ObjectInspector.Category.UNION) { + UnionObjectInspector unionObjectInspector = (UnionObjectInspector) fieldObjectInspector; + byte tag = unionObjectInspector.getTag(fieldData); + fieldData = unionObjectInspector.getField(fieldData); + fieldObjectInspector = unionObjectInspector.getObjectInspectors().get(tag); + } + + return new FieldContext(fieldData, fieldObjectInspector, fieldTypeInfo, fieldSchema); + } + + private static class FieldContext { + final TypeInfo typeInfo; + final ObjectInspector objectInspector; + final Object object; + final Schema schema; + + FieldContext(Object object, ObjectInspector objectInspector, TypeInfo typeInfo, Schema schema) { + this.object = object; + this.objectInspector = objectInspector; + this.typeInfo = typeInfo; + this.schema = schema; + } } private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); + public GenericRecord serialize(Object o) { + if (recordSchema == null) { + throw new IllegalArgumentException("Cannot serialize without a record schema"); + } + return serialize(o, recordSchema); + } + public GenericRecord serialize(Object o, Schema schema) { - StructObjectInspector soi = (StructObjectInspector) objectInspector; + StructObjectInspector soi = objectInspector; GenericData.Record record = new GenericData.Record(schema); List outputFieldRefs = soi.getAllStructFieldRefs(); @@ -128,76 +284,13 @@ private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object structFieldD } } - /** - * Determine if an Avro schema is of type Union[T, NULL]. Avro supports nullable - * types via a union of type T and null. This is a very common use case. - * As such, we want to silently convert it to just T and allow the value to be null. - *

- * When a Hive union type is used with AVRO, the schema type becomes - * Union[NULL, T1, T2, ...]. The NULL in the union should be silently removed - * - * @return true if type represents Union[T, Null], false otherwise - */ - public static boolean isNullableType(Schema schema) { - if (!schema.getType().equals(Schema.Type.UNION)) { - return false; - } - - List itemSchemas = schema.getTypes(); - if (itemSchemas.size() < 2) { - return false; - } - - for (Schema itemSchema : itemSchemas) { - if (Schema.Type.NULL.equals(itemSchema.getType())) { - return true; - } - } - - // [null, null] not allowed, so this check is ok. - return false; - } - - /** - * If the union schema is a nullable union, get the schema for the non-nullable type. - * This method does no checking that the provided Schema is nullable. If the provided - * union schema is non-nullable, it simply returns the union schema - */ - public static Schema getOtherTypeFromNullableType(Schema unionSchema) { - final List types = unionSchema.getTypes(); - if (types.size() == 2) { // most common scenario - if (types.get(0).getType() == Schema.Type.NULL) { - return types.get(1); - } - if (types.get(1).getType() == Schema.Type.NULL) { - return types.get(0); - } - // not a nullable union - return unionSchema; - } - - final List itemSchemas = new ArrayList<>(); - for (Schema itemSchema : types) { - if (!Schema.Type.NULL.equals(itemSchema.getType())) { - itemSchemas.add(itemSchema); - } - } - - if (itemSchemas.size() > 1) { - return Schema.createUnion(itemSchemas); - } else { - return itemSchemas.get(0); - } - } - private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { if (null == structFieldData) { return null; } - if (isNullableType(schema)) { - schema = getOtherTypeFromNullableType(schema); - } + schema = resolveNullableSchema(schema); + /* Because we use Hive's 'string' type when Avro calls for enum, we have to expressly check for enum-ness */ if (Schema.Type.ENUM.equals(schema.getType())) { assert fieldOI instanceof PrimitiveObjectInspector; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java index 76deb4caab4e0..64087331c63bd 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java @@ -19,68 +19,308 @@ package org.apache.hudi.hadoop.utils; -import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.avro.AvroSchemaUtils; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieAvroSchemaException; +import org.apache.hudi.exception.SchemaCompatibilityException; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Deque; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.function.UnaryOperator; +import static org.apache.hudi.avro.AvroSchemaUtils.areSchemasProjectionEquivalent; +import static org.apache.hudi.avro.AvroSchemaUtils.isNullable; +import static org.apache.hudi.avro.HoodieAvroUtils.createFullName; +import static org.apache.hudi.avro.HoodieAvroUtils.createNamePrefix; +import static org.apache.hudi.avro.HoodieAvroUtils.getOldFieldNameWithRenaming; +import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate; +import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; + public class HoodieArrayWritableAvroUtils { - private static final Cache, int[]> - PROJECTION_CACHE = Caffeine.newBuilder().maximumSize(1000).build(); + public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable writable, Schema oldSchema, Schema newSchema, Map renameCols) { + return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, newSchema, renameCols, new LinkedList<>()); + } - public static int[] getProjection(Schema from, Schema to) { - return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> { - List toFields = to.getFields(); - int[] newProjection = new int[toFields.size()]; - for (int i = 0; i < newProjection.length; i++) { - newProjection[i] = from.getField(toFields.get(i).name()).pos(); - } - return newProjection; - }); + private static Writable rewriteRecordWithNewSchema(Writable writable, Schema oldAvroSchema, Schema newAvroSchema, Map renameCols, Deque fieldNames) { + if (writable == null) { + return null; + } + Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema); + Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema); + if (areSchemasProjectionEquivalent(oldSchema, newSchema)) { + return writable; + } + return rewriteRecordWithNewSchemaInternal(writable, oldSchema, newSchema, renameCols, fieldNames); } - /** - * Projection will keep the size from the "from" schema because it gets recycled - * and if the size changes the reader will fail - */ - public static UnaryOperator projectRecord(Schema from, Schema to) { - //TODO: [HUDI-8261] add casting to the projection - int[] projection = getProjection(from, to); - return arrayWritable -> { - Writable[] values = new Writable[arrayWritable.get().length]; - for (int i = 0; i < projection.length; i++) { - values[i] = arrayWritable.get()[projection[i]]; - } - arrayWritable.set(values); - return arrayWritable; - }; + private static Writable rewriteRecordWithNewSchemaInternal(Writable writable, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames) { + switch (newSchema.getType()) { + case RECORD: + if (!(writable instanceof ArrayWritable)) { + throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as a record", writable.getClass().getName())); + } + + ArrayWritable arrayWritable = (ArrayWritable) writable; + List fields = newSchema.getFields(); + // projection will keep the size from the "from" schema because it gets recycled + // and if the size changes the reader will fail + boolean noFieldsRenaming = renameCols.isEmpty(); + String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames); + Writable[] values = new Writable[Math.max(fields.size(), arrayWritable.get().length)]; + for (int i = 0; i < fields.size(); i++) { + Schema.Field newField = newSchema.getFields().get(i); + String newFieldName = newField.name(); + fieldNames.push(newFieldName); + Schema.Field oldField = noFieldsRenaming + ? oldSchema.getField(newFieldName) + : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix, newFieldName, renameCols)); + if (oldField != null) { + values[i] = rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], oldField.schema(), newField.schema(), renameCols, fieldNames); + } else if (newField.defaultVal() instanceof JsonProperties.Null) { + values[i] = NullWritable.get(); + } else if (!isNullable(newField.schema()) && newField.defaultVal() == null) { + throw new SchemaCompatibilityException("Field " + createFullName(fieldNames) + " has no default value and is non-nullable"); + } else if (newField.defaultVal() != null) { + switch (AvroSchemaUtils.resolveNullableSchema(newField.schema()).getType()) { + case BOOLEAN: + values[i] = new BooleanWritable((Boolean) newField.defaultVal()); + break; + case INT: + values[i] = new IntWritable((Integer) newField.defaultVal()); + break; + case LONG: + values[i] = new LongWritable((Long) newField.defaultVal()); + break; + case FLOAT: + values[i] = new FloatWritable((Float) newField.defaultVal()); + break; + case DOUBLE: + values[i] = new DoubleWritable((Double) newField.defaultVal()); + break; + case STRING: + values[i] = new Text(newField.defaultVal().toString()); + break; + default: + throw new SchemaCompatibilityException("Field " + createFullName(fieldNames) + " has no default value"); + } + } + fieldNames.pop(); + } + return new ArrayWritable(Writable.class, values); + + case ENUM: + if ((writable instanceof BytesWritable)) { + return writable; + } + if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() != Schema.Type.ENUM) { + throw new SchemaCompatibilityException(String.format("Only ENUM or STRING type can be converted ENUM type. Schema type was %s", oldSchema.getType().getName())); + } + if (oldSchema.getType() == Schema.Type.STRING) { + return new BytesWritable(((Text) writable).copyBytes()); + } + return writable; + case ARRAY: + if (!(writable instanceof ArrayWritable)) { + throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as an array", writable.getClass().getName())); + } + ArrayWritable array = (ArrayWritable) writable; + fieldNames.push("element"); + for (int i = 0; i < array.get().length; i++) { + array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames); + } + fieldNames.pop(); + return array; + case MAP: + if (!(writable instanceof ArrayWritable)) { + throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as a map", writable.getClass().getName())); + } + ArrayWritable map = (ArrayWritable) writable; + fieldNames.push("value"); + for (int i = 0; i < map.get().length; i++) { + Writable mapEntry = map.get()[i]; + ((ArrayWritable) mapEntry).get()[1] = rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames); + } + return map; + + case UNION: + throw new IllegalArgumentException("should not be here?"); + + default: + return rewritePrimaryType(writable, oldSchema, newSchema); + } } - public static int[] getReverseProjection(Schema from, Schema to) { - return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> { - List fromFields = from.getFields(); - int[] newProjection = new int[fromFields.size()]; - for (int i = 0; i < newProjection.length; i++) { - newProjection[i] = to.getField(fromFields.get(i).name()).pos(); + public static Writable rewritePrimaryType(Writable writable, Schema oldSchema, Schema newSchema) { + if (oldSchema.getType() == newSchema.getType()) { + switch (oldSchema.getType()) { + case NULL: + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BYTES: + case STRING: + return writable; + case FIXED: + if (oldSchema.getFixedSize() != newSchema.getFixedSize()) { + // Check whether this is a [[Decimal]]'s precision change + if (oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) oldSchema.getLogicalType(); + return HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale())); + } else { + throw new HoodieAvroSchemaException("Fixed type size change is not currently supported"); + } + } + + // For [[Fixed]] data type both size and name have to match + // + // NOTE: That for values wrapped into [[Union]], to make sure that reverse lookup (by + // full-name) is working we have to make sure that both schema's name and namespace + // do match + if (Objects.equals(oldSchema.getFullName(), newSchema.getFullName())) { + return writable; + } else { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) oldSchema.getLogicalType(); + return HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale())); + } + + default: + throw new HoodieAvroSchemaException("Unknown schema type: " + newSchema.getType()); } - return newProjection; - }); + } else { + return rewritePrimaryTypeWithDiffSchemaType(writable, oldSchema, newSchema); + } + } + + private static Writable rewritePrimaryTypeWithDiffSchemaType(Writable writable, Schema oldSchema, Schema newSchema) { + switch (newSchema.getType()) { + case NULL: + case BOOLEAN: + break; + case INT: + if (newSchema.getLogicalType() == LogicalTypes.date() && oldSchema.getType() == Schema.Type.STRING) { + return HoodieHiveUtils.getDateWriteable((HoodieAvroUtils.fromJavaDate(java.sql.Date.valueOf(writable.toString())))); + } + break; + case LONG: + if (oldSchema.getType() == Schema.Type.INT) { + return new LongWritable(((IntWritable) writable).get()); + } + break; + case FLOAT: + if ((oldSchema.getType() == Schema.Type.INT) + || (oldSchema.getType() == Schema.Type.LONG)) { + return oldSchema.getType() == Schema.Type.INT + ? new FloatWritable(((IntWritable) writable).get()) + : new FloatWritable(((LongWritable) writable).get()); + } + break; + case DOUBLE: + if (oldSchema.getType() == Schema.Type.FLOAT) { + // java float cannot convert to double directly, deal with float precision change + return new DoubleWritable(Double.parseDouble(((FloatWritable) writable).get() + "")); + } else if (oldSchema.getType() == Schema.Type.INT) { + return new DoubleWritable(((IntWritable) writable).get()); + } else if (oldSchema.getType() == Schema.Type.LONG) { + return new DoubleWritable(((LongWritable) writable).get()); + } + break; + case BYTES: + if (oldSchema.getType() == Schema.Type.STRING) { + return new BytesWritable(getUTF8Bytes(writable.toString())); + } + break; + case STRING: + if (oldSchema.getType() == Schema.Type.ENUM) { + return writable; + } + if (oldSchema.getType() == Schema.Type.BYTES) { + return new Text(StringUtils.fromUTF8Bytes(((BytesWritable) writable).getBytes())); + } + if (oldSchema.getLogicalType() == LogicalTypes.date()) { + return new Text(toJavaDate(((IntWritable) writable).get()).toString()); + } + if (oldSchema.getType() == Schema.Type.INT + || oldSchema.getType() == Schema.Type.LONG + || oldSchema.getType() == Schema.Type.FLOAT + || oldSchema.getType() == Schema.Type.DOUBLE) { + return new Text(writable.toString()); + } + if (oldSchema.getType() == Schema.Type.FIXED && oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { + HiveDecimalWritable hdw = (HiveDecimalWritable) writable; + return new Text(hdw.getHiveDecimal().bigDecimalValue().toPlainString()); + } + break; + case FIXED: + if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) newSchema.getLogicalType(); + DecimalTypeInfo decimalTypeInfo = new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()); + + if (oldSchema.getType() == Schema.Type.STRING + || oldSchema.getType() == Schema.Type.INT + || oldSchema.getType() == Schema.Type.LONG + || oldSchema.getType() == Schema.Type.FLOAT + || oldSchema.getType() == Schema.Type.DOUBLE) { + // loses trailing zeros due to hive issue. Since we only read with hive, I think this is fine + HiveDecimalWritable converted = new HiveDecimalWritable(HiveDecimal.create(new BigDecimal(writable.toString()))); + return HiveDecimalUtils.enforcePrecisionScale(converted, decimalTypeInfo); + } + + if (oldSchema.getType() == Schema.Type.BYTES) { + ByteBuffer buffer = ByteBuffer.wrap(((BytesWritable) writable).getBytes()); + BigDecimal bd = new BigDecimal(new BigInteger(buffer.array()), decimal.getScale()); + HiveDecimalWritable converted = new HiveDecimalWritable(HiveDecimal.create(bd)); + return HiveDecimalUtils.enforcePrecisionScale(converted, decimalTypeInfo); + } + } + break; + default: + } + throw new HoodieAvroSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)); + } + + private static int[] getReverseProjectionMapping(Schema from, Schema to) { + List fromFields = from.getFields(); + int[] newProjection = new int[fromFields.size()]; + for (int i = 0; i < newProjection.length; i++) { + newProjection[i] = to.getField(fromFields.get(i).name()).pos(); + } + return newProjection; } /** * After the reading and merging etc is done, we need to put the records * into the positions of the original schema */ - public static UnaryOperator reverseProject(Schema from, Schema to) { - int[] projection = getReverseProjection(from, to); + public static UnaryOperator getReverseProjection(Schema from, Schema to) { + int[] projection = getReverseProjectionMapping(from, to); return arrayWritable -> { Writable[] values = new Writable[to.getFields().size()]; for (int i = 0; i < projection.length; i++) { @@ -90,10 +330,5 @@ public static UnaryOperator reverseProject(Schema from, Schema to return arrayWritable; }; } - - public static Object getWritableValue(ArrayWritable arrayWritable, ArrayWritableObjectInspector objectInspector, String name) { - return objectInspector.getStructFieldData(arrayWritable, objectInspector.getStructFieldRef(name)); - } } - diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 72d1beab804b4..723e65c82a89d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -34,7 +34,6 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; @@ -58,6 +57,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -216,7 +216,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema, boolean case ENUM: return new BytesWritable(value.toString().getBytes()); case ARRAY: - GenericArray arrayValue = (GenericArray) value; + Collection arrayValue = (Collection) value; Writable[] arrayValues = new Writable[arrayValue.size()]; int arrayValueIndex = 0; for (Object obj : arrayValue) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java deleted file mode 100644 index f743ad81e51f1..0000000000000 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.hadoop.utils; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.mapred.JobConf; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -/** - * To read value from an ArrayWritable, an ObjectInspector is needed. - * Object inspectors are cached here or created using the column type map. - */ -public class ObjectInspectorCache { - private final Map columnTypeMap = new HashMap<>(); - private final Map objectInspectorCache = new HashMap<>(); - private final Map serializerCache = new HashMap<>(); - - public Map getColumnTypeMap() { - return columnTypeMap; - } - - public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) { - //From AbstractRealtimeRecordReader#prepareHiveAvroSerializer - // hive will append virtual columns at the end of column list. we should remove those columns. - // eg: current table is col1, col2, col3; jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 ,BLOCK__OFFSET__INSIDE__FILE ... - Set writerSchemaColNames = tableSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet()); - List columnNameList = Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList()); - List columnTypeList = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES)); - - int columnNameListLen = columnNameList.size() - 1; - for (int i = columnNameListLen; i >= 0; i--) { - String lastColName = columnNameList.get(columnNameList.size() - 1); - // virtual columns will only append at the end of column list. it will be ok to break the loop. - if (writerSchemaColNames.contains(lastColName)) { - break; - } - columnNameList.remove(columnNameList.size() - 1); - columnTypeList.remove(columnTypeList.size() - 1); - } - - //Use columnNameList.size() instead of columnTypeList because the type list is longer for some reason - IntStream.range(0, columnNameList.size()).boxed().forEach(i -> columnTypeMap.put(columnNameList.get(i), - TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0))); - - StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); - ArrayWritableObjectInspector objectInspector = new ArrayWritableObjectInspector(rowTypeInfo); - objectInspectorCache.put(tableSchema, objectInspector); - } - - public Object getValue(ArrayWritable record, Schema schema, String fieldName) { - ArrayWritable currentRecord = record; - String[] path = fieldName.split("\\."); - StructField structFieldRef; - ArrayWritableObjectInspector objectInspector = getObjectInspector(schema); - for (int i = 0; i < path.length; i++) { - String field = path[i]; - structFieldRef = objectInspector.getStructFieldRef(field); - Object value = structFieldRef == null ? null : objectInspector.getStructFieldData(currentRecord, structFieldRef); - if (i == path.length - 1) { - return value; - } - currentRecord = (ArrayWritable) value; - objectInspector = (ArrayWritableObjectInspector) structFieldRef.getFieldObjectInspector(); - } - return null; - } - - public ArrayWritableObjectInspector getObjectInspector(Schema schema) { - return objectInspectorCache.computeIfAbsent(schema, s -> { - List columnNameList = s.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList()); - List columnTypeList = columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList()); - StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); - return new ArrayWritableObjectInspector(rowTypeInfo); - }); - } - - public GenericRecord serialize(ArrayWritable record, Schema schema) { - return serializerCache.computeIfAbsent(schema, s -> { - List columnNameList = s.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList()); - List columnTypeList = columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList()); - StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); - return new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList); - }).serialize(record, schema); - } -} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java index d5538cfa64e21..76bbbbc1d83d5 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHiveHoodieReaderContext.java @@ -22,19 +22,16 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.read.BufferedRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.hadoop.utils.ObjectInspectorCache; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -59,14 +56,9 @@ class TestHiveHoodieReaderContext { @Test void getRecordKeyWithSingleKey() { - JobConf jobConf = getJobConf(); - - Schema schema = getBaseSchema(); - ObjectInspectorCache objectInspectorCache = new ObjectInspectorCache(schema, jobConf); - when(tableConfig.populateMetaFields()).thenReturn(false); when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]{"field_1"})); - HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext(readerCreator, Collections.emptyList(), objectInspectorCache, storageConfiguration, tableConfig); + HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext(readerCreator, Collections.emptyList(), storageConfiguration, tableConfig); ArrayWritable row = new ArrayWritable(Writable.class, new Writable[]{new Text("value1"), new Text("value2"), new ArrayWritable(new String[]{"value3"})}); assertEquals("value1", avroReaderContext.getRecordContext().getRecordKey(row, getBaseSchema())); @@ -74,14 +66,9 @@ void getRecordKeyWithSingleKey() { @Test void getRecordKeyWithMultipleKeys() { - JobConf jobConf = getJobConf(); - - Schema schema = getBaseSchema(); - ObjectInspectorCache objectInspectorCache = new ObjectInspectorCache(schema, jobConf); - when(tableConfig.populateMetaFields()).thenReturn(false); when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]{"field_1", "field_3.nested_field"})); - HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext(readerCreator, Collections.emptyList(), objectInspectorCache, storageConfiguration, tableConfig); + HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext(readerCreator, Collections.emptyList(), storageConfiguration, tableConfig); ArrayWritable row = new ArrayWritable(Writable.class, new Writable[]{new Text("value1"), new Text("value2"), new ArrayWritable(new String[]{"value3"})}); assertEquals("field_1:value1,field_3.nested_field:value3", avroReaderContext.getRecordContext().getRecordKey(row, getBaseSchema())); @@ -89,13 +76,8 @@ void getRecordKeyWithMultipleKeys() { @Test void getNestedField() { - JobConf jobConf = getJobConf(); - - Schema schema = getBaseSchema(); - ObjectInspectorCache objectInspectorCache = new ObjectInspectorCache(schema, jobConf); - when(tableConfig.populateMetaFields()).thenReturn(true); - HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext(readerCreator, Collections.emptyList(), objectInspectorCache, storageConfiguration, tableConfig); + HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext(readerCreator, Collections.emptyList(), storageConfiguration, tableConfig); ArrayWritable row = new ArrayWritable(Writable.class, new Writable[]{new Text("value1"), new Text("value2"), new ArrayWritable(new String[]{"value3"})}); assertEquals("value3", avroReaderContext.getRecordContext().getValue(row, getBaseSchema(), "field_3.nested_field").toString()); @@ -103,12 +85,9 @@ void getNestedField() { @Test void testConstructEngineRecordWithNoUpdates() { - JobConf jobConf = getJobConf(); - Schema schema = getBaseSchema(); - ObjectInspectorCache objectInspectorCache = new ObjectInspectorCache(schema, jobConf); when(tableConfig.populateMetaFields()).thenReturn(true); HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext( - readerCreator, Collections.emptyList(), objectInspectorCache, storageConfiguration, tableConfig); + readerCreator, Collections.emptyList(), storageConfiguration, tableConfig); ArrayWritable base = createBaseRecord(new Writable[]{ new IntWritable(1), @@ -127,12 +106,9 @@ void testConstructEngineRecordWithNoUpdates() { @Test void testConstructEngineRecordWithUpdates() { - JobConf jobConf = getJobConf(); - Schema schema = getBaseSchema(); - ObjectInspectorCache objectInspectorCache = new ObjectInspectorCache(schema, jobConf); when(tableConfig.populateMetaFields()).thenReturn(true); HiveHoodieReaderContext avroReaderContext = new HiveHoodieReaderContext( - readerCreator, Collections.emptyList(), objectInspectorCache, storageConfiguration, tableConfig); + readerCreator, Collections.emptyList(), storageConfiguration, tableConfig); ArrayWritable base = createBaseRecord(new Writable[]{ new IntWritable(1), @@ -151,13 +127,6 @@ void testConstructEngineRecordWithUpdates() { assertTrue(((BooleanWritable) values[2]).get()); } - private JobConf getJobConf() { - JobConf jobConf = new JobConf(storageConfiguration.unwrapAs(Configuration.class)); - jobConf.set("columns", "field_1,field_2,field_3,datestr"); - jobConf.set("columns.types", "string,string,struct,string"); - return jobConf; - } - private static Schema getBaseSchema() { Schema baseDataSchema = Schema.createRecord("test", null, null, false); Schema.Field baseField1 = new Schema.Field("field_1", Schema.create(Schema.Type.STRING)); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java index d759d06c14c59..d3c8e186551e4 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java @@ -27,22 +27,21 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.hadoop.utils.ObjectInspectorCache; +import org.apache.hudi.hadoop.utils.HiveAvroSerializer; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mock; import org.mockito.MockitoAnnotations; import java.math.BigDecimal; import java.time.LocalDate; +import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; class TestHoodieHiveRecord { private HoodieHiveRecord hoodieHiveRecord; - @Mock - private ObjectInspectorCache mockObjectInspectorCache; @BeforeEach void setUp() { @@ -51,10 +50,11 @@ void setUp() { // Create a minimal HoodieHiveRecord instance with mocked dependencies HoodieKey key = new HoodieKey("test-key", "test-partition"); ArrayWritable data = new ArrayWritable(Writable.class, new Writable[]{new Text("test")}); - Schema schema = Schema.create(Schema.Type.STRING); + Schema schema = Schema.createRecord("TestRecord", null, null, false); + schema.setFields(Collections.singletonList(new Schema.Field("testField", Schema.create(Schema.Type.STRING), null, null))); // Create HoodieHiveRecord with mocked dependencies - hoodieHiveRecord = new HoodieHiveRecord(key, data, schema, mockObjectInspectorCache); + hoodieHiveRecord = new HoodieHiveRecord(key, data, schema, new HiveAvroSerializer(schema)); } @Test diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java index 31a9c1781b47d..d22093c242237 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java @@ -19,18 +19,22 @@ package org.apache.hudi.hadoop.utils; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.exception.HoodieException; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.junit.jupiter.api.Test; @@ -38,9 +42,17 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHiveAvroSerializer { @@ -156,4 +168,245 @@ private List createHiveTypeInfoFrom(final String columnsTypeStr) { return columnTypes; } + + private static final String SCHEMA_WITH_NESTED_RECORD = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"address\", \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Address\",\n" + + " \"fields\": [\n" + + " {\"name\": \"city\", \"type\": \"string\"},\n" + + " {\"name\": \"zip\", \"type\": \"int\"}\n" + + " ]\n" + + " }}\n" + + " ]\n" + + "}"; + + private static final String SCHEMA_WITH_ARRAY_AND_MAP = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"ComplexRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"tags\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},\n" + + " {\"name\": \"properties\", \"type\": {\"type\": \"map\", \"values\": \"string\"}}\n" + + " ]\n" + + "}"; + + @Test + public void testGetTopLevelFields() { + Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HiveAvroSerializer serializer = new HiveAvroSerializer(schema); + + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new IntWritable(101), + new Text("John Doe"), + new ArrayWritable(Writable.class, new Writable[]{ + new Text("New York"), + new IntWritable(10001) + }) + }); + + assertEquals(new IntWritable(101), serializer.getValue(record, "id")); + assertEquals(new Text("John Doe"), serializer.getValue(record, "name")); + } + + @Test + public void testGetNestedFields() { + Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HiveAvroSerializer serializer = new HiveAvroSerializer(schema); + + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new IntWritable(202), + new Text("Alice"), + new ArrayWritable(Writable.class, new Writable[]{ + new Text("San Francisco"), + new IntWritable(94107) + }) + }); + + assertEquals(new Text("San Francisco"), serializer.getValue(record, "address.city")); + assertEquals(new IntWritable(94107), serializer.getValue(record, "address.zip")); + } + + @Test + public void testInvalidFieldNameThrows() { + Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HiveAvroSerializer serializer = new HiveAvroSerializer(schema); + + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new IntWritable(303), + new Text("Bob"), + new ArrayWritable(Writable.class, new Writable[]{ + new Text("Los Angeles"), + new IntWritable(90001) + }) + }); + + assertThrows(HoodieException.class, () -> { + serializer.getValue(record, "nonexistent"); + }); + + assertThrows(HoodieException.class, () -> { + serializer.getValue(record, "address.nonexistent"); + }); + } + + @Test + public void testGetValueFromArrayOrMap() { + Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP); + HiveAvroSerializer serializer = new HiveAvroSerializer(schema); + + ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{ + new Text("a"), new Text("b") + }); + + ArrayWritable propertiesMap = new ArrayWritable(Writable.class, new Writable[]{ + new ArrayWritable(Writable.class, new Writable[]{new Text("key1"), new Text("val1")}), + new ArrayWritable(Writable.class, new Writable[]{new Text("key2"), new Text("val2")}) + }); + + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new IntWritable(1), tagsArray, propertiesMap + }); + + // Access the entire field is ok + Object tagsResult = serializer.getValue(record, "tags"); + assertInstanceOf(ArrayWritable.class, tagsResult); + assertEquals(tagsArray, tagsResult); + + Object propertiesResult = serializer.getValue(record, "properties"); + assertInstanceOf(ArrayWritable.class, propertiesResult); + assertEquals(propertiesMap, propertiesResult); + + // access element or key/value is not ok + assertThrows(HoodieException.class, () -> { + serializer.getValue(record, "tags.element"); + }); + + assertThrows(HoodieException.class, () -> { + serializer.getValue(record, "properties.key"); + }); + + assertThrows(HoodieException.class, () -> { + serializer.getValue(record, "properties.value"); + }); + } + + @Test + public void testGetJavaTopLevelFields() { + Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HiveAvroSerializer serializer = new HiveAvroSerializer(schema); + + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new IntWritable(101), + new Text("John Doe"), + new ArrayWritable(Writable.class, new Writable[]{ + new Text("New York"), + new IntWritable(10001) + }) + }); + + assertEquals(101, serializer.getValueAsJava(record, "id")); + assertEquals(new Utf8("John Doe"), serializer.getValueAsJava(record, "name")); + } + + @Test + public void testGetJavaNestedFields() { + Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HiveAvroSerializer serializer = new HiveAvroSerializer(schema); + + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new IntWritable(202), + new Text("Alice"), + new ArrayWritable(Writable.class, new Writable[]{ + new Text("San Francisco"), + new IntWritable(94107) + }) + }); + + assertEquals(new Utf8("San Francisco"), serializer.getValueAsJava(record, "address.city")); + assertEquals(94107, serializer.getValueAsJava(record, "address.zip")); + } + + @Test + public void testGetJavaArrayAndMap() { + Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP); + HiveAvroSerializer serializer = new HiveAvroSerializer(schema); + + ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{ + new Text("a"), new Text("b") + }); + + ArrayWritable propertiesMap = new ArrayWritable(Writable.class, new Writable[]{ + new ArrayWritable(Writable.class, new Writable[]{new Text("key1"), new Text("val1")}), + new ArrayWritable(Writable.class, new Writable[]{new Text("key2"), new Text("val2")}) + }); + + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new IntWritable(1), tagsArray, propertiesMap + }); + + Object tags = serializer.getValueAsJava(record, "tags"); + assertInstanceOf(Collection.class, tags); + + Collection tagList = (Collection) tags; + List expectedValues = Arrays.asList(new Utf8("a"), new Utf8("b")); + + Iterator actualIter = tagList.iterator(); + Iterator expectedIter = expectedValues.iterator(); + + while (expectedIter.hasNext() && actualIter.hasNext()) { + Object actual = actualIter.next(); + Utf8 expected = expectedIter.next(); + assertEquals(expected, actual); + } + + assertFalse(actualIter.hasNext(), "Actual has more elements than expected"); + assertFalse(expectedIter.hasNext(), "Expected has more elements than actual"); + + + Object props = serializer.getValueAsJava(record, "properties"); + assertInstanceOf(Map.class, props); + + Map resultMap = new HashMap<>(); + resultMap.put(new Utf8("key1"), new Utf8("val1")); + resultMap.put(new Utf8("key2"), new Utf8("val2")); + + assertEquals(resultMap, props); + } + + @Test + public void testGetJavaInvalidFieldAccess() { + Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP); + HiveAvroSerializer serializer = new HiveAvroSerializer(schema); + + ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{ + new Text("a"), new Text("b") + }); + + ArrayWritable propertiesMap = new ArrayWritable(Writable.class, new Writable[]{ + new ArrayWritable(Writable.class, new Writable[]{new Text("key1"), new Text("val1")}), + new ArrayWritable(Writable.class, new Writable[]{new Text("key2"), new Text("val2")}) + }); + + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new IntWritable(1), tagsArray, propertiesMap + }); + + assertThrows(HoodieException.class, () -> { + serializer.getValueAsJava(record, "tags.element"); + }); + + assertThrows(HoodieException.class, () -> { + serializer.getValueAsJava(record, "properties.key"); + }); + + assertThrows(HoodieException.class, () -> { + serializer.getValueAsJava(record, "properties.value"); + }); + } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java index e318880c4441c..5febe8f600837 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java @@ -22,32 +22,49 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.engine.RecordContext; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.hadoop.HiveHoodieReaderContext; import org.apache.hudi.hadoop.HiveRecordContext; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; +import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; import java.util.Arrays; -import java.util.List; +import java.util.Collections; import java.util.function.UnaryOperator; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -55,43 +72,33 @@ public class TestHoodieArrayWritableAvroUtils { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); Schema tableSchema = HoodieTestDataGenerator.AVRO_SCHEMA; - ObjectInspectorCache objectInspectorCache; - - @BeforeEach - public void setup() { - List fields = tableSchema.getFields(); - Configuration conf = HoodieTestUtils.getDefaultStorageConf().unwrap(); - JobConf jobConf = new JobConf(conf); - jobConf.set(serdeConstants.LIST_COLUMNS, fields.stream().map(Schema.Field::name).collect(Collectors.joining(","))); - jobConf.set(serdeConstants.LIST_COLUMN_TYPES, HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES); - objectInspectorCache = new ObjectInspectorCache(HoodieTestDataGenerator.AVRO_SCHEMA, jobConf); - } @Test public void testProjection() { Schema from = tableSchema; Schema to = HoodieAvroUtils.generateProjectionSchema(from, Arrays.asList("trip_type", "current_ts", "weight")); - UnaryOperator projection = HoodieArrayWritableAvroUtils.projectRecord(from, to); - UnaryOperator reverseProjection = HoodieArrayWritableAvroUtils.reverseProject(to, from); + UnaryOperator reverseProjection = HoodieArrayWritableAvroUtils.getReverseProjection(to, from); //We reuse the ArrayWritable, so we need to get the values before projecting ArrayWritable record = convertArrayWritable(dataGen.generateGenericRecord()); - Object tripType = objectInspectorCache.getValue(record, from, "trip_type"); - Object currentTs = objectInspectorCache.getValue(record, from, "current_ts"); - Object weight = objectInspectorCache.getValue(record, from, "weight"); + HiveAvroSerializer fromSerializer = new HiveAvroSerializer(from); + Object tripType = fromSerializer.getValue(record, "trip_type"); + Object currentTs = fromSerializer.getValue(record, "current_ts"); + Object weight = fromSerializer.getValue(record, "weight"); //Make sure the projected fields can be read - ArrayWritable projectedRecord = projection.apply(record); - assertEquals(tripType, objectInspectorCache.getValue(projectedRecord, to, "trip_type")); - assertEquals(currentTs, objectInspectorCache.getValue(projectedRecord, to, "current_ts")); - assertEquals(weight, objectInspectorCache.getValue(projectedRecord, to, "weight")); + ArrayWritable projectedRecord = HoodieArrayWritableAvroUtils.rewriteRecordWithNewSchema(record, from, to, Collections.emptyMap()); + HiveAvroSerializer toSerializer = new HiveAvroSerializer(to); + assertEquals(tripType, toSerializer.getValue(projectedRecord, "trip_type")); + assertEquals(currentTs, toSerializer.getValue(projectedRecord, "current_ts")); + assertEquals(weight, toSerializer.getValue(projectedRecord, "weight")); //Reverse projection, the fields are in the original spots, but only the fields we set can be read. //Therefore, we can only check the 3 fields that were in the projection ArrayWritable reverseProjected = reverseProjection.apply(projectedRecord); - assertEquals(tripType, objectInspectorCache.getValue(reverseProjected, from, "trip_type")); - assertEquals(currentTs, objectInspectorCache.getValue(reverseProjected, from, "current_ts")); - assertEquals(weight, objectInspectorCache.getValue(reverseProjected, from, "weight")); + assertEquals(tripType, fromSerializer.getValue(reverseProjected, "trip_type")); + assertEquals(currentTs, fromSerializer.getValue(reverseProjected, "current_ts")); + assertEquals(weight, fromSerializer.getValue(reverseProjected, "weight")); } private static ArrayWritable convertArrayWritable(GenericRecord record) { @@ -118,4 +125,282 @@ public void testCastOrderingField() { WritableComparable reflexive = new IntWritable(8675309); assertEquals(reflexive, readerContext.getRecordContext().convertValueToEngineType(reflexive)); } + + @Test + void testRewriteStringToDateInt() throws AvroSerdeException { + Schema oldSchema = Schema.create(Schema.Type.STRING); + Schema newSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + Writable oldWritable = new Text("2023-01-01"); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, newSchema); + Writable expected = HoodieHiveUtils.getDateWriteable(HoodieAvroUtils.fromJavaDate(Date.valueOf("2023-01-01"))); + assertEquals(expected, result); + validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema); + } + + @Test + void testRewriteIntToLong() throws AvroSerdeException { + Writable oldWritable = new IntWritable(42); + Schema oldSchema = Schema.create(Schema.Type.INT); + Schema newSchema = Schema.create(Schema.Type.LONG); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, newSchema); + Writable expected = new LongWritable(42); + assertEquals(expected, result); + validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema); + } + + @Test + void testRewriteLongToFloat() throws AvroSerdeException { + Writable oldWritable = new LongWritable(123); + Schema oldSchema = Schema.create(Schema.Type.LONG); + Schema newSchema = Schema.create(Schema.Type.FLOAT); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, newSchema); + Writable expected = new FloatWritable(123.0f); + assertEquals(expected, result); + validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema); + } + + @Test + void testRewriteFloatToDouble() throws AvroSerdeException { + Writable oldWritable = new FloatWritable(3.14f); + Schema oldSchema = Schema.create(Schema.Type.FLOAT); + Schema newSchema = Schema.create(Schema.Type.DOUBLE); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, newSchema); + Writable expected = new DoubleWritable(3.14d); + assertEquals(expected, result); + validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema); + } + + @Test + void testRewriteBytesToString() throws AvroSerdeException { + BytesWritable oldWritable = new BytesWritable("hello".getBytes()); + Schema oldSchema = Schema.create(Schema.Type.BYTES); + Schema newSchema = Schema.create(Schema.Type.STRING); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, newSchema); + Writable expected = new Text("hello"); + assertEquals(expected, result); + validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema); + } + + @Test + void testRewriteIntToString() throws AvroSerdeException { + Writable oldWritable = new IntWritable(123); + Schema oldSchema = Schema.create(Schema.Type.INT); + Schema newSchema = Schema.create(Schema.Type.STRING); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, newSchema); + Writable expected = new Text("123"); + assertEquals(expected, result); + validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema); + } + + @Test + void testRewriteFixedDecimalToString() throws AvroSerdeException { + Schema decimalSchema = LogicalTypes.decimal(10, 2).addToSchema(Schema.createFixed("decimal", null, null, 5)); + HiveDecimalWritable oldWritable = new HiveDecimalWritable(HiveDecimal.create(new BigDecimal("123.45"))); + Schema newSchema = Schema.create(Schema.Type.STRING); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, decimalSchema, newSchema); + Writable expected = new Text("123.45"); + assertEquals(expected, result); + validateRewriteWithAvro(oldWritable, decimalSchema, result, newSchema); + } + + @Test + void testRewriteStringToFixedDecimal() throws AvroSerdeException { + Schema decimalSchema = LogicalTypes.decimal(10, 2).addToSchema(Schema.createFixed("decimal", null, null, 5)); + Writable oldWritable = new Text("123.45"); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, Schema.create(Schema.Type.STRING), decimalSchema); + assertInstanceOf(HiveDecimalWritable.class, result); + assertEquals(new BigDecimal("123.45"), ((HiveDecimalWritable) result).getHiveDecimal().bigDecimalValue()); + validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.STRING), result, decimalSchema); + } + + @Test + void testRewriteBytesToFixedDecimal() throws AvroSerdeException { + BigDecimal input = new BigDecimal("123.45"); + byte[] bytes = input.unscaledValue().toByteArray(); + BytesWritable oldWritable = new BytesWritable(bytes); + Schema decimalSchema = LogicalTypes.decimal(5, 2).addToSchema(Schema.createFixed("decimal", null, null, 5)); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, Schema.create(Schema.Type.BYTES), decimalSchema); + assertEquals(input, ((HiveDecimalWritable) result).getHiveDecimal().bigDecimalValue()); + validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.BYTES), result, decimalSchema); + } + + @Test + void testUnsupportedTypeConversionThrows() { + Schema oldSchema = Schema.createMap(Schema.create(Schema.Type.INT)); + Schema newSchema = Schema.create(Schema.Type.STRING); + assertThrows(HoodieAvroSchemaException.class, () -> + HoodieArrayWritableAvroUtils.rewritePrimaryType(null, oldSchema, newSchema)); + } + + @Test + void testRewriteEnumToString() throws AvroSerdeException { + Schema enumSchema = Schema.createEnum("TestEnum", null, null, Arrays.asList("A", "B", "C")); + Writable oldWritable = new Text("B"); + Schema newSchema = Schema.create(Schema.Type.STRING); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, enumSchema, newSchema); + Writable expected = new Text("B"); + assertEquals(expected, result); + validateRewriteWithAvro(oldWritable, enumSchema, result, newSchema); + } + + @Test + void testRewriteFixedWithSameSizeAndFullName() { + Schema oldFixed = Schema.createFixed("decimal", null, "ns", 5); + Schema newFixed = Schema.createFixed("decimal", null, "ns", 5); + HiveDecimalWritable hdw = new HiveDecimalWritable(HiveDecimal.create("123.45")); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(hdw, oldFixed, newFixed); + assertSame(hdw, result); + } + + @Test + void testRewriteFixedWithSameSizeButDifferentNameUsesDecimalFallback() throws AvroSerdeException { + Schema oldFixed = LogicalTypes.decimal(5, 2).addToSchema(Schema.createFixed("decA", null, "ns1", 5)); + Schema newFixed = LogicalTypes.decimal(5, 2).addToSchema(Schema.createFixed("decB", null, "ns2", 5)); + HiveDecimalWritable oldWritable = new HiveDecimalWritable(HiveDecimal.create("123.45")); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldFixed, newFixed); + assertInstanceOf(HiveDecimalWritable.class, result); + assertEquals(new BigDecimal("123.45"), ((HiveDecimalWritable) result).getHiveDecimal().bigDecimalValue()); + validateRewriteWithAvro(oldWritable, oldFixed, result, newFixed); + } + + @Test + void testRewriteBooleanPassthrough() { + Schema boolSchema = Schema.create(Schema.Type.BOOLEAN); + BooleanWritable bool = new BooleanWritable(true); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(bool, boolSchema, boolSchema); + assertSame(bool, result); + } + + @Test + void testUnsupportedRewriteMapToIntThrows() { + Schema oldSchema = Schema.createMap(Schema.create(Schema.Type.STRING)); + Schema newSchema = Schema.create(Schema.Type.INT); + assertThrows(HoodieAvroSchemaException.class, () -> + HoodieArrayWritableAvroUtils.rewritePrimaryType(new Text("foo"), oldSchema, newSchema)); + } + + @Test + void testRewriteIntToDecimalFixed() throws AvroSerdeException { + Schema fixedDecimal = LogicalTypes.decimal(8, 2).addToSchema(Schema.createFixed("dec", null, null, 6)); + IntWritable oldWritable = new IntWritable(12345); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, Schema.create(Schema.Type.INT), fixedDecimal); + assertInstanceOf(HiveDecimalWritable.class, result); + assertEquals(new BigDecimal("12345"), ((HiveDecimalWritable) result).getHiveDecimal().bigDecimalValue()); + validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.INT), result, fixedDecimal); + } + + @Test + void testRewriteDoubleToDecimalFixed() throws AvroSerdeException { + Schema fixedDecimal = LogicalTypes.decimal(10, 3).addToSchema(Schema.createFixed("dec", null, null, 8)); + DoubleWritable oldWritable = new DoubleWritable(987.654); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, Schema.create(Schema.Type.DOUBLE), fixedDecimal); + assertInstanceOf(HiveDecimalWritable.class, result); + assertEquals(new BigDecimal("987.654"), ((HiveDecimalWritable) result).getHiveDecimal().bigDecimalValue()); + validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.DOUBLE), result, fixedDecimal); + } + + @Test + void testRewriteDecimalBytesToFixed() throws AvroSerdeException { + Schema decimalSchema = LogicalTypes.decimal(6, 2).addToSchema(Schema.createFixed("dec", null, null, 6)); + BigDecimal value = new BigDecimal("999.99"); + byte[] unscaledBytes = value.unscaledValue().toByteArray(); + BytesWritable oldWritable = new BytesWritable(unscaledBytes); + Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, Schema.create(Schema.Type.BYTES), decimalSchema); + assertEquals(value, ((HiveDecimalWritable) result).getHiveDecimal().bigDecimalValue()); + validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.BYTES), result, decimalSchema); + } + + private void validateRewriteWithAvro( + Writable oldWritable, + Schema oldSchema, + Writable newWritable, + Schema newSchema + ) throws AvroSerdeException { + TypeInfo oldTypeInfo = HiveTypeUtils.generateTypeInfo(oldSchema, Collections.emptySet()); + TypeInfo newTypeInfo = HiveTypeUtils.generateTypeInfo(newSchema, Collections.emptySet()); + + ObjectInspector oldObjectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(oldTypeInfo); + ObjectInspector newObjectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(newTypeInfo); + + ObjectInspector writableOIOld = getWritableOIForType(oldTypeInfo); + ObjectInspector writableOINew = getWritableOIForType(newTypeInfo); + + Object javaInput = ObjectInspectorConverters.getConverter(writableOIOld, oldObjectInspector).convert(oldWritable); + if (isDecimalSchema(oldSchema)) { + javaInput = HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(getDecimalValue(javaInput, oldSchema), oldSchema, oldSchema.getLogicalType()); + } else if (javaInput instanceof byte[]) { + javaInput = ByteBuffer.wrap((byte[]) javaInput); + } + Object javaOutput = HoodieAvroUtils.rewritePrimaryType(javaInput, oldSchema, newSchema); + Object javaExpected = ObjectInspectorConverters.getConverter(writableOINew, newObjectInspector).convert(newWritable); + + if (isDecimalSchema(newSchema)) { + BigDecimal outputDecimal = getDecimalValue(javaOutput, newSchema); + BigDecimal expectedDecimal = getDecimalValue(javaExpected, newSchema); + assertEquals(0, outputDecimal.compareTo(expectedDecimal)); + } else if (newSchema.getLogicalType() instanceof LogicalTypes.Date) { + assertEquals(HoodieAvroUtils.toJavaDate((int) javaOutput), javaExpected); + } else { + assertEquals(javaOutput, javaExpected); + } + } + + private boolean isDecimalSchema(Schema schema) { + return schema.getLogicalType() instanceof LogicalTypes.Decimal; + } + + private BigDecimal getDecimalValue(Object value, Schema decimalSchema) { + if (value instanceof HiveDecimal) { + return ((HiveDecimal) value).bigDecimalValue(); + } else if (value instanceof HiveDecimalWritable) { + return ((HiveDecimalWritable) value).getHiveDecimal().bigDecimalValue(); + } else if (value instanceof BigDecimal) { + return (BigDecimal) value; + } else if (value instanceof byte[]) { + int scale = ((LogicalTypes.Decimal) decimalSchema.getLogicalType()).getScale(); + return new BigDecimal(new BigInteger((byte[]) value), scale); + } else if (value instanceof GenericData.Fixed) { + int scale = ((LogicalTypes.Decimal) decimalSchema.getLogicalType()).getScale(); + byte[] bytes = ((GenericData.Fixed) value).bytes(); + return new BigDecimal(new BigInteger(bytes), scale); + } + throw new IllegalArgumentException("Unsupported decimal object: " + value.getClass() + " -> " + value); + } + + private ObjectInspector getWritableOIForType(TypeInfo typeInfo) { + switch (typeInfo.getCategory()) { + case PRIMITIVE: + PrimitiveTypeInfo pti = (PrimitiveTypeInfo) typeInfo; + switch (pti.getPrimitiveCategory()) { + case BOOLEAN: + return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; + case BYTE: + return PrimitiveObjectInspectorFactory.writableByteObjectInspector; + case SHORT: + return PrimitiveObjectInspectorFactory.writableShortObjectInspector; + case INT: + return PrimitiveObjectInspectorFactory.writableIntObjectInspector; + case LONG: + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + case FLOAT: + return PrimitiveObjectInspectorFactory.writableFloatObjectInspector; + case DOUBLE: + return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + case STRING: + case CHAR: + case VARCHAR: + return PrimitiveObjectInspectorFactory.writableStringObjectInspector; + case BINARY: + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + case DECIMAL: + return PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector; + case DATE: + return PrimitiveObjectInspectorFactory.writableDateObjectInspector; + default: + throw new UnsupportedOperationException("Unsupported primitive type: " + pti.getPrimitiveCategory()); + } + default: + throw new UnsupportedOperationException("Unsupported category: " + typeInfo.getCategory()); + } + } }