Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
26a3d7a
add non type promotion scenarios
Sep 16, 2023
57148da
add type promotion and table services
Sep 18, 2023
d07690e
clean up a bit
Sep 18, 2023
097ef61
add testing for nested and complex promotion, as well as non-nullable…
Sep 18, 2023
e6050ef
fix for compaction
Sep 18, 2023
9365e3b
add pull #9571 to this pr
Sep 19, 2023
fcfd0d2
add reconcile tests and make build
Sep 19, 2023
4e952ab
fix clustering mor
Sep 22, 2023
b6a9a29
fix test for both record type
Sep 22, 2023
f3d8462
clustering type promotion mostly working:
Sep 27, 2023
7e06b0f
add to string type promotion
Sep 27, 2023
d71288b
add multiple filegroups and multiple log files to non type promotion …
Sep 28, 2023
3e335ff
do refactoring
Sep 28, 2023
0d0c413
add extra scenarios to type promo
Sep 28, 2023
9e0134b
add more docs, and fix one of the tests
Sep 28, 2023
f627d9b
2 impls for fixing string type promo. Need to do perf tests
Oct 2, 2023
56aa98d
add auto promotion for demoted inputs
Oct 3, 2023
556e762
fix drop column support
Oct 4, 2023
c1ca876
all schema evolutions can be done at the same time
Oct 5, 2023
724e42e
apply delete block change to file slice reader
Oct 5, 2023
852a37f
Merge branch 'master' into out_of_box_schema_evolution
Oct 5, 2023
70c2646
fix byte-string stuff
Oct 5, 2023
51fabad
for returning the source schema, we need to convert to internal schem…
Oct 6, 2023
20243ac
don't convert to internal row and then convert back
Oct 6, 2023
908ae8d
fix failing tests
Oct 10, 2023
f544ec5
Merge branch 'master' into out_of_box_schema_evolution
Oct 12, 2023
8a2aab4
some changes
Oct 12, 2023
8b312ef
address most of the review feedback
Oct 12, 2023
c81e51e
Merge branch 'apache:master' into out_of_box_schema_evolution
jonvex Oct 16, 2023
51c94f4
fix failing tests
Oct 16, 2023
a5aae57
address comments
Oct 16, 2023
7aa5e40
fix backwards compat
Oct 17, 2023
ce8a559
adding a utility api to check if type is numeric
Oct 17, 2023
77a3f30
Handling null schema provider and fixing the failing test cases.
Oct 17, 2023
66df705
Fixing checkstyling
Oct 17, 2023
ec853e3
isTypeNumeric to exclude byte type
Oct 17, 2023
9e8e32c
Fixing syntax issue
Oct 17, 2023
bc45850
Adding a test case that is uncovering a bug in fixNullOrdering api
Oct 18, 2023
e32b58f
add CI testing
Oct 18, 2023
8826dfc
Fixing schema to the way our internal convertor work (this is the api…
Oct 18, 2023
0fb70b3
fix nested struct multiple evolution
Oct 18, 2023
7750c36
Handling null schema in fixNullOrdering API
Oct 18, 2023
95bc5f3
Fixing missing semi colon
Oct 18, 2023
0848706
fix scala 2.11 compat issue
Oct 19, 2023
bf43b05
serialize avro schema
Oct 19, 2023
d369795
Return None if schema.on.read is not enabled in getLatestTableInterna…
Oct 19, 2023
0fe4d74
fix for async clustering
Oct 19, 2023
7c353cd
account for alter schema commit
Oct 24, 2023
f98cbcb
change async table fix to be exclusive instead of inclusive
Oct 25, 2023
661b169
Merge branch 'apache:master' into out_of_box_schema_evolution
jonvex Oct 25, 2023
43f9d0b
Fixing schema to be evolved wrt to table schema when target schema is…
lokesh-lingarajan-0310 Oct 26, 2023
56ed4a1
Fixing schema deduce for target schema provider code path
lokesh-lingarajan-0310 Oct 27, 2023
7fed094
add avrokafka source and transformers and schemaprovider to the tests
Oct 28, 2023
16128cd
Addressing feedback from Siva
nsivabalan Nov 4, 2023
bfe8edd
Add tests
lokeshj1703 Nov 8, 2023
58d6194
Fix test failure with testTypePromotion
lokeshj1703 Nov 8, 2023
db06d7f
Resolving merge conflicts
nsivabalan Nov 9, 2023
d3e58fe
Fixing clean up of resources in tests
nsivabalan Nov 9, 2023
4c36398
disabling schema evol tests
nsivabalan Nov 9, 2023
a1ab15d
Fixing build failure
nsivabalan Nov 9, 2023
b8cab7a
reverting disabling tests
nsivabalan Nov 9, 2023
a21058e
Fixing test failures
nsivabalan Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,27 +182,24 @@ object AvroConversionUtils {
} else {
field.doc()
}
val newSchema = getAvroSchemaWithDefaults(field.schema(), structFields(i).dataType)
field.schema().getType match {
case Schema.Type.UNION => {
val innerFields = newSchema.getTypes
val containsNullSchema = innerFields.foldLeft(false)((nullFieldEncountered, schema) => nullFieldEncountered | schema.getType == Schema.Type.NULL)
if(containsNullSchema) {
// Need to re shuffle the fields in list because to set null as default, null schema must be head in union schema
val restructuredNewSchema = Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
new Schema.Field(field.name(), restructuredNewSchema, comment, JsonProperties.NULL_VALUE)
} else {
new Schema.Field(field.name(), newSchema, comment, field.defaultVal())
}
}
case _ => new Schema.Field(field.name(), newSchema, comment, field.defaultVal())
//need special handling for union because we update field default to null if it's in the union
val (newSchema, containsNullSchema) = field.schema().getType match {
case Schema.Type.UNION => resolveUnion(field.schema(), structFields(i).dataType)
case _ => (getAvroSchemaWithDefaults(field.schema(), structFields(i).dataType), false)
}
new Schema.Field(field.name(), newSchema, comment,
if (containsNullSchema) {
JsonProperties.NULL_VALUE
} else {
field.defaultVal()
})
}).toList
Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, modifiedFields)
}

case Schema.Type.UNION => {
Schema.createUnion(schema.getTypes.map(innerSchema => getAvroSchemaWithDefaults(innerSchema, dataType)))
val (resolved, _) = resolveUnion(schema, dataType)
resolved
}

case Schema.Type.MAP => {
Expand All @@ -217,6 +214,25 @@ object AvroConversionUtils {
}
}

/**
* Helper method for getAvroSchemaWithDefaults for schema type union
* re-arrange so that null is first if it is in the union
*
* @param schema input avro schema
* @return Avro schema with null default set to nullable fields and bool that is true if the union contains null
*
* */
private def resolveUnion(schema: Schema, dataType: DataType): (Schema, Boolean) = {
val innerFields = schema.getTypes
val containsNullSchema = innerFields.foldLeft(false)((nullFieldEncountered, schema) => nullFieldEncountered | schema.getType == Schema.Type.NULL)
(if (containsNullSchema) {
Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL))
.map(innerSchema => getAvroSchemaWithDefaults(innerSchema, dataType)))
} else {
Schema.createUnion(schema.getTypes.map(innerSchema => getAvroSchemaWithDefaults(innerSchema, dataType)))
}, containsNullSchema)
}

/**
* Please use [[AvroSchemaUtils.getAvroRecordQualifiedName(String)]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.TreeSet;
import java.util.stream.Collectors;

import static org.apache.hudi.avro.HoodieAvroUtils.isTypeNumeric;
import static org.apache.hudi.common.util.ValidationUtils.checkState;

/**
Expand All @@ -62,10 +63,15 @@
public class AvroSchemaCompatibility {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchemaCompatibility.class);

/** Utility class cannot be instantiated. */
private AvroSchemaCompatibility() {}
/**
* Utility class cannot be instantiated.
*/
private AvroSchemaCompatibility() {
}

/** Message to annotate reader/writer schema pairs that are compatible. */
/**
* Message to annotate reader/writer schema pairs that are compatible.
*/
public static final String READER_WRITER_COMPATIBLE_MESSAGE = "Reader schema can always successfully decode data written using the writer schema.";

/**
Expand Down Expand Up @@ -161,7 +167,7 @@ public static Field lookupWriterField(final Schema writerSchema, final Field rea

/**
* Reader/writer schema pair that can be used as a key in a hash map.
*
* <p>
* This reader/writer pair differentiates Schema objects based on their system
* hash code.
*/
Expand All @@ -180,13 +186,17 @@ public ReaderWriter(final Schema reader, final Schema writer) {
mWriter = writer;
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return System.identityHashCode(mReader) ^ System.identityHashCode(mWriter);
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ReaderWriter)) {
Expand All @@ -197,7 +207,9 @@ public boolean equals(Object obj) {
return (this.mReader == that.mReader) && (this.mWriter == that.mWriter);
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return String.format("ReaderWriter{reader:%s, writer:%s}", mReader, mWriter);
Expand Down Expand Up @@ -279,8 +291,8 @@ private SchemaCompatibilityResult getCompatibility(final Schema reader,
* {@link #getCompatibility(Schema, Schema)}.
* </p>
*
* @param reader Reader schema to test.
* @param writer Writer schema to test.
* @param reader Reader schema to test.
* @param writer Writer schema to test.
* @param locations Stack with which to track the location within the schema.
* @return the compatibility of the reader/writer schema pair.
*/
Expand Down Expand Up @@ -372,7 +384,8 @@ private SchemaCompatibilityResult calculateCompatibility(final Schema reader, fi
return (writer.getType() == Type.STRING) ? result : result.mergedWith(typeMismatch(reader, writer, locations));
}
case STRING: {
return (writer.getType() == Type.BYTES) ? result : result.mergedWith(typeMismatch(reader, writer, locations));
return (isTypeNumeric(writer.getType()) || (writer.getType() == Schema.Type.BYTES)
? result : result.mergedWith(typeMismatch(reader, writer, locations)));
}

case ARRAY:
Expand Down Expand Up @@ -540,7 +553,9 @@ private static List<String> asList(Deque<LocationInfo> deque) {
public enum SchemaCompatibilityType {
COMPATIBLE, INCOMPATIBLE,

/** Used internally to tag a reader/writer schema pair and prevent recursion. */
/**
* Used internally to tag a reader/writer schema pair and prevent recursion.
*/
RECURSION_IN_PROGRESS;
}

Expand All @@ -565,7 +580,7 @@ public static final class SchemaCompatibilityResult {
* @param toMerge The {@code SchemaCompatibilityResult} to merge with the
* current instance.
* @return A {@code SchemaCompatibilityResult} that combines the state of the
* current and supplied instances.
* current and supplied instances.
*/
public SchemaCompatibilityResult mergedWith(SchemaCompatibilityResult toMerge) {
List<Incompatibility> mergedIncompatibilities = new ArrayList<>(mIncompatibilities);
Expand Down Expand Up @@ -595,7 +610,7 @@ private SchemaCompatibilityResult(SchemaCompatibilityType compatibilityType,
* Returns a details object representing a compatible schema pair.
*
* @return a SchemaCompatibilityDetails object with COMPATIBLE
* SchemaCompatibilityType, and no other state.
* SchemaCompatibilityType, and no other state.
*/
public static SchemaCompatibilityResult compatible() {
return COMPATIBLE;
Expand All @@ -606,7 +621,7 @@ public static SchemaCompatibilityResult compatible() {
* progress.
*
* @return a SchemaCompatibilityDetails object with RECURSION_IN_PROGRESS
* SchemaCompatibilityType, and no other state.
* SchemaCompatibilityType, and no other state.
*/
public static SchemaCompatibilityResult recursionInProgress() {
return RECURSION_IN_PROGRESS;
Expand All @@ -617,7 +632,7 @@ public static SchemaCompatibilityResult recursionInProgress() {
* error details.
*
* @return a SchemaCompatibilityDetails object with INCOMPATIBLE
* SchemaCompatibilityType, and state representing the violating part.
* SchemaCompatibilityType, and state representing the violating part.
*/
public static SchemaCompatibilityResult incompatible(SchemaIncompatibilityType incompatibilityType,
Schema readerFragment, Schema writerFragment, String message, List<String> location) {
Expand All @@ -641,13 +656,15 @@ public SchemaCompatibilityType getCompatibility() {
* Incompatibilities} found, otherwise an empty list.
*
* @return a list of {@link Incompatibility Incompatibilities}, may be empty,
* never null.
* never null.
*/
public List<Incompatibility> getIncompatibilities() {
return mIncompatibilities;
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
final int prime = 31;
Expand All @@ -657,7 +674,9 @@ public int hashCode() {
return result;
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand All @@ -680,7 +699,9 @@ public boolean equals(Object obj) {
return mCompatibilityType == other.mCompatibilityType;
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return String.format("SchemaCompatibilityResult{compatibility:%s, incompatibilities:%s}", mCompatibilityType,
Expand Down Expand Up @@ -737,8 +758,8 @@ public Schema getWriterFragment() {
* Returns a human-readable message with more details about what failed. Syntax
* depends on the SchemaIncompatibilityType.
*
* @see #getType()
* @return a String with details about the incompatibility.
* @see #getType()
*/
public String getMessage() {
return mMessage;
Expand Down Expand Up @@ -768,7 +789,9 @@ public String getLocation() {
return s.toString();
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
final int prime = 31;
Expand All @@ -781,7 +804,9 @@ public int hashCode() {
return result;
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand Down Expand Up @@ -825,7 +850,9 @@ public boolean equals(Object obj) {
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return String.format("Incompatibility{type:%s, location:%s, message:%s, reader:%s, writer:%s}", mType,
Expand All @@ -837,21 +864,29 @@ public String toString() {
/**
* Provides information about the compatibility of a single reader and writer
* schema pair.
*
* <p>
* Note: This class represents a one-way relationship from the reader to the
* writer schema.
*/
public static final class SchemaPairCompatibility {
/** The details of this result. */
/**
* The details of this result.
*/
private final SchemaCompatibilityResult mResult;

/** Validated reader schema. */
/**
* Validated reader schema.
*/
private final Schema mReader;

/** Validated writer schema. */
/**
* Validated writer schema.
*/
private final Schema mWriter;

/** Human readable description of this result. */
/**
* Human readable description of this result.
*/
private final String mDescription;

/**
Expand Down Expand Up @@ -915,14 +950,18 @@ public String getDescription() {
return mDescription;
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return String.format("SchemaPairCompatibility{result:%s, readerSchema:%s, writerSchema:%s, description:%s}",
mResult, mReader, mWriter, mDescription);
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object other) {
if ((other instanceof SchemaPairCompatibility)) {
Expand All @@ -934,14 +973,18 @@ public boolean equals(Object other) {
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return Arrays.hashCode(new Object[] { mResult, mReader, mWriter, mDescription });
return Arrays.hashCode(new Object[] {mResult, mReader, mWriter, mDescription});
}
}

/** Borrowed from Guava's Objects.equal(a, b) */
/**
* Borrowed from Guava's Objects.equal(a, b)
*/
private static boolean objectsEqual(Object obj1, Object obj2) {
return Objects.equals(obj1, obj2);
}
Expand Down
Loading