Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7e3cc31
fix schema on read on hive
Jul 30, 2025
ff7a559
get rid of nullable check
Jul 30, 2025
ed44441
rename util method
Jul 30, 2025
0ef0f87
remove java restriction
Jul 30, 2025
de5c67c
revert java changes and move to new pr
Jul 30, 2025
3ba37c5
fix hive partition issue
Jul 30, 2025
041a135
fix hive tests
Jul 31, 2025
306d9e2
Merge branch 'master' into fix_schema_on_write_hive
Aug 2, 2025
ab9e84f
address review comments and apply fix that was done for avro
Aug 2, 2025
c7b4b8f
add testing
Aug 2, 2025
baedc96
get rid of object inspector cache
Aug 2, 2025
bfd9aa1
simplify if in hoodieavroutils
Aug 2, 2025
ae845ce
style
Aug 2, 2025
49815e2
update test that uses illegal schema for record
Aug 2, 2025
5fa01e2
address review comments
Aug 4, 2025
a31899b
refactor to get rid of most caches and make code better
Aug 4, 2025
65f6c00
address review comment and fix some things
Aug 4, 2025
dbb3f06
address review comments
Aug 5, 2025
15fbf03
add tests for the new hive avro serializer methods and also make the …
Aug 5, 2025
77825b4
fix parquet/orc extension check. Add schema evolution on log file che…
Aug 6, 2025
ea0bb44
fix checkstyle
Aug 6, 2025
560d38c
Merge branch 'master' into fix_schema_on_write_hive
Aug 10, 2025
96f114f
Merge branch 'master' into fix_schema_on_write_hive
Aug 11, 2025
efb6058
reuse code for validation of projection equivalence
Aug 11, 2025
52a99e1
address some review comments
Aug 12, 2025
4c10c4e
Revert "address some review comments"
yihua Aug 13, 2025
e24a2e3
Add test case
yihua Aug 13, 2025
4a5339b
Only use equivalent check for read-only utils
yihua Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
Expand Down Expand Up @@ -58,7 +57,6 @@
public class TestHoodieFileGroupReaderOnHive extends HoodieFileGroupReaderOnJavaTestBase<ArrayWritable> {

private static final String PARTITION_COLUMN = "datestr";
private static JobConf baseJobConf;
private static HdfsTestService hdfsTestService;
private static HoodieStorage storage;
private static FileSystem fs;
Expand All @@ -74,7 +72,7 @@ public static void setUpClass() throws IOException {
hdfsTestService = new HdfsTestService();
fs = hdfsTestService.start(true).getFileSystem();
storageConf = HoodieTestUtils.getDefaultStorageConf();
baseJobConf = new JobConf(storageConf.unwrap());
JobConf baseJobConf = new JobConf(storageConf.unwrap());
baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), String.valueOf(1024 * 1024));
fs.setConf(baseJobConf);
storage = new HoodieHadoopStorage(fs);
Expand All @@ -100,8 +98,7 @@ public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String tablePat
JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class));
setupJobconf(jobConf, avroSchema);
return new HiveHoodieReaderContext(readerCreator,
getStoredPartitionFieldNames(new JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema),
new ObjectInspectorCache(avroSchema, jobConf), storageConf, metaClient.getTableConfig());
getStoredPartitionFieldNames(new JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema), storageConf, metaClient.getTableConfig());
}

@Override
Expand All @@ -116,24 +113,7 @@ public void assertRecordMatchesSchema(Schema schema, ArrayWritable record) {

@Override
public HoodieTestDataGenerator.SchemaEvolutionConfigs getSchemaEvolutionConfigs() {
HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new HoodieTestDataGenerator.SchemaEvolutionConfigs();
configs.nestedSupport = false;
configs.arraySupport = false;
configs.mapSupport = false;
configs.addNewFieldSupport = false;
configs.intToLongSupport = false;
configs.intToFloatSupport = false;
configs.intToDoubleSupport = false;
configs.intToStringSupport = false;
configs.longToFloatSupport = false;
configs.longToDoubleSupport = false;
configs.longToStringSupport = false;
configs.floatToDoubleSupport = false;
configs.floatToStringSupport = false;
configs.doubleToStringSupport = false;
configs.stringToBytesSupport = false;
configs.bytesToStringSupport = false;
return configs;
return new HoodieTestDataGenerator.SchemaEvolutionConfigs();
}

private void setupJobconf(JobConf jobConf, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,15 @@ public static void assertArrayWritableMatchesSchema(Schema schema, Writable writ
case UNION:
if (schema.getTypes().size() == 2
&& schema.getTypes().get(0).getType() == Schema.Type.NULL) {
if (writable == null || writable instanceof NullWritable) {
return;
}
assertArrayWritableMatchesSchema(schema.getTypes().get(1), writable);
} else if (schema.getTypes().size() == 2
&& schema.getTypes().get(1).getType() == Schema.Type.NULL) {
if (writable == null || writable instanceof NullWritable) {
return;
}
assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable);
} else if (schema.getTypes().size() == 1) {
assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.avro;

import org.apache.avro.Schema;

import java.util.List;

import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;

public class AvroSchemaComparatorForRecordProjection extends AvroSchemaComparatorForSchemaEvolution {

private static final AvroSchemaComparatorForRecordProjection INSTANCE = new AvroSchemaComparatorForRecordProjection();

public static boolean areSchemasProjectionEquivalent(Schema s1, Schema s2) {
return INSTANCE.schemaEqualsInternal(s1, s2);
}

@Override
protected boolean schemaEqualsInternal(Schema s1, Schema s2) {
if (s1 == s2) {
return true;
}
if (s1 == null || s2 == null) {
return false;
}
return super.schemaEqualsInternal(resolveNullableSchema(s1), resolveNullableSchema(s2));
}

@Override
protected boolean validateRecord(Schema s1, Schema s2) {
return true;
}

@Override
protected boolean validateField(Schema.Field f1, Schema.Field f2) {
return f1.name().equalsIgnoreCase(f2.name());
Copy link
Contributor

@yihua yihua Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended for case insensitivity of column names?

}

@Override
protected boolean enumSchemaEquals(Schema s1, Schema s2) {
List<String> symbols1 = s1.getEnumSymbols();
List<String> symbols2 = s2.getEnumSymbols();
if (symbols1.size() > symbols2.size()) {
return false;
}

for (int i = 0; i < symbols1.size(); i++) {
if (!symbols1.get(i).equalsIgnoreCase(symbols2.get(i))) {
return false;
}
}
return true;
}

@Override
protected boolean unionSchemaEquals(Schema s1, Schema s2) {
throw new UnsupportedOperationException("union not supported for projection equivalence");
}

@Override
protected boolean validateFixed(Schema s1, Schema s2) {
return s1.getFixedSize() == s2.getFixedSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.apache.avro.Schema;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -140,10 +138,16 @@
*/
public class AvroSchemaComparatorForSchemaEvolution {

private AvroSchemaComparatorForSchemaEvolution() {
protected AvroSchemaComparatorForSchemaEvolution() {
}

private static final AvroSchemaComparatorForSchemaEvolution VALIDATOR = new AvroSchemaComparatorForSchemaEvolution();

public static boolean schemaEquals(Schema s1, Schema s2) {
return VALIDATOR.schemaEqualsInternal(s1, s2);
}

protected boolean schemaEqualsInternal(Schema s1, Schema s2) {
if (s1 == s2) {
return true;
}
Expand Down Expand Up @@ -181,35 +185,35 @@ public static boolean schemaEquals(Schema s1, Schema s2) {
}
}

private static boolean recordSchemaEquals(Schema s1, Schema s2) {
protected boolean validateRecord(Schema s1, Schema s2) {
if (s1.isError() != s2.isError()) {
return false;
}

return logicalTypeSchemaEquals(s1, s2);
}

private boolean recordSchemaEquals(Schema s1, Schema s2) {
if (!validateRecord(s1, s2)) {
return false;
}

List<Schema.Field> fields1 = s1.getFields();
List<Schema.Field> fields2 = s2.getFields();

if (fields1.size() != fields2.size()) {
return false;
}

Map<String, Schema.Field> fieldMap1 = fields1.stream()
.collect(Collectors.toMap(Schema.Field::name, Function.identity()));

for (Schema.Field f2 : fields2) {
Schema.Field f1 = fieldMap1.get(f2.name());
if (f1 == null) {
return false;
}
if (!fieldEquals(f1, f2)) {
for (int i = 0; i < fields1.size(); i++) {
if (!fieldEquals(fields1.get(i), fields2.get(i))) {
return false;
}
}

return logicalTypeSchemaEquals(s1, s2);
return true;
}

private static boolean fieldEquals(Schema.Field f1, Schema.Field f2) {
protected boolean validateField(Schema.Field f1, Schema.Field f2) {
if (!f1.name().equals(f2.name())) {
return false;
}
Expand All @@ -223,10 +227,6 @@ private static boolean fieldEquals(Schema.Field f1, Schema.Field f2) {
return false;
}

if (!schemaEquals(f1.schema(), f2.schema())) {
return false;
}

// If both have default values, they must be equal
if (f1.hasDefaultValue() && !f1.defaultVal().equals(f2.defaultVal())) {
return false;
Expand All @@ -235,7 +235,15 @@ private static boolean fieldEquals(Schema.Field f1, Schema.Field f2) {
return true;
}

private static boolean enumSchemaEquals(Schema s1, Schema s2) {
private boolean fieldEquals(Schema.Field f1, Schema.Field f2) {
if (!validateField(f1, f2)) {
return false;
}

return schemaEqualsInternal(f1.schema(), f2.schema());
}

protected boolean enumSchemaEquals(Schema s1, Schema s2) {
// Check name equality first
if (!s1.getName().equals(s2.getName())) {
return false;
Expand All @@ -252,7 +260,7 @@ private static boolean enumSchemaEquals(Schema s1, Schema s2) {
return symbols1.equals(symbols2);
}

private static boolean unionSchemaEquals(Schema s1, Schema s2) {
protected boolean unionSchemaEquals(Schema s1, Schema s2) {
List<Schema> types1 = s1.getTypes();
List<Schema> types2 = s2.getTypes();

Expand All @@ -268,17 +276,23 @@ private static boolean unionSchemaEquals(Schema s1, Schema s2) {
return set1.equals(set2);
}

private static boolean arraySchemaEquals(Schema s1, Schema s2) {
return schemaEquals(s1.getElementType(), s2.getElementType());
private boolean arraySchemaEquals(Schema s1, Schema s2) {
return schemaEqualsInternal(s1.getElementType(), s2.getElementType());
}

private boolean mapSchemaEquals(Schema s1, Schema s2) {
return schemaEqualsInternal(s1.getValueType(), s2.getValueType());
}

private static boolean mapSchemaEquals(Schema s1, Schema s2) {
return schemaEquals(s1.getValueType(), s2.getValueType());
protected boolean validateFixed(Schema s1, Schema s2) {
return s1.getName().equals(s2.getName()) && s1.getFixedSize() == s2.getFixedSize();
}

private static boolean fixedSchemaEquals(Schema s1, Schema s2) {
return s1.getName().equals(s2.getName()) && s1.getFixedSize() == s2.getFixedSize()
&& logicalTypeSchemaEquals(s1, s2);
private boolean fixedSchemaEquals(Schema s1, Schema s2) {
if (!validateFixed(s1, s2)) {
return false;
}
return logicalTypeSchemaEquals(s1, s2);
}

private static boolean primitiveSchemaEquals(Schema s1, Schema s2) {
Expand Down
Loading
Loading