Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker/compose/docker-compose_hadoop284_hive233_spark244.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ services:
- "namenode"

zookeeper:
image: 'bitnami/zookeeper:3.4.12-r68'
image: 'bitnamilegacy/zookeeper:3.4.12-r68'
hostname: zookeeper
container_name: zookeeper
ports:
Expand All @@ -184,7 +184,7 @@ services:
- ALLOW_ANONYMOUS_LOGIN=yes

kafka:
image: 'bitnami/kafka:2.0.0'
image: 'bitnamilegacy/kafka:2.0.0'
hostname: kafkabroker
container_name: kafkabroker
ports:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.spark.sql.types.UserDefinedType;
import org.apache.spark.sql.types.VarcharType;

import java.lang.reflect.Field;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Deque;
Expand All @@ -82,6 +83,21 @@ private SparkInternalSchemaConverter() {
public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
public static final String HOODIE_VALID_COMMITS_LIST = "hoodie.valid.commits.list";

/**
* Get TimestampNTZType$ using reflection, as it's only available in Spark 3.3+.
* Falls back to TimestampType$ if TimestampNTZType is not available.
*/
private static DataType getTimestampNTZType() {
try {
Class<?> timestampNTZTypeClass = Class.forName("org.apache.spark.sql.types.TimestampNTZType$");
Field moduleField = timestampNTZTypeClass.getField("MODULE$");
return (DataType) moduleField.get(null);
} catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException e) {
// TimestampNTZType is not available in this Spark version, fall back to TimestampType
return TimestampType$.MODULE$;
}
}

public static Type buildTypeFromStructType(DataType sparkType, Boolean firstVisitRoot, AtomicInteger nextId) {
if (sparkType instanceof StructType) {
StructField[] fields = ((StructType) sparkType).fields();
Expand Down Expand Up @@ -267,10 +283,14 @@ private static DataType constructSparkSchemaFromType(Type type) {
case DATE:
return DateType$.MODULE$;
case TIME:
case TIME_MILLIS:
throw new UnsupportedOperationException(String.format("cannot convert %s type to Spark", type));
case TIMESTAMP:
// todo support TimeStampNTZ
case TIMESTAMP_MILLIS:
return TimestampType$.MODULE$;
case LOCAL_TIMESTAMP_MILLIS:
case LOCAL_TIMESTAMP_MICROS:
return getTimestampNTZType();
case STRING:
return StringType$.MODULE$;
case UUID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
Expand All @@ -58,6 +57,7 @@

import static org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA;
import static org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;

class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {

Expand All @@ -71,7 +71,7 @@ Schema getAvroSchema(StoragePath sourceFilePath) throws IOException {
(Configuration) table.getStorageConf().unwrap(), new Path(sourceFilePath.toUri()),
ParquetMetadataConverter.NO_FILTER);
MessageType parquetSchema = readFooter.getFileMetaData().getSchema();
return new AvroSchemaConverter().convert(parquetSchema);
return getAvroSchemaConverter((Configuration) table.getStorageConf().unwrap()).convert(parquetSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ trait SparkAdapter extends Serializable {
*/
def isColumnarBatchRow(r: InternalRow): Boolean

def isTimestampNTZType(dataType: DataType): Boolean

/**
* Creates Catalyst [[Metadata]] for Hudi's meta-fields (designating these w/
* [[METADATA_COL_ATTR_KEY]] if available (available in Spark >= 3.2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.avro;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.exception.InvalidUnionTypeException;
import org.apache.hudi.exception.MissingSchemaFieldException;
Expand Down Expand Up @@ -199,6 +201,25 @@ private static boolean isProjectionOfInternal(Schema sourceSchema,
return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema);
}

public static Option<Schema> findNestedFieldSchema(Schema schema, String fieldName) {
if (StringUtils.isNullOrEmpty(fieldName)) {
return Option.empty();
}
String[] parts = fieldName.split("\\.");
for (String part : parts) {
Schema.Field foundField = resolveNullableSchema(schema).getField(part);
if (foundField == null) {
throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema);
}
schema = foundField.schema();
}
return Option.of(resolveNullableSchema(schema));
}

public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) {
return findNestedFieldSchema(schema, fieldName).map(Schema::getType);
}

/**
* Appends provided new fields at the end of the given schema
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;

import java.lang.reflect.Constructor;
import java.util.Map;

/**
Expand All @@ -42,13 +43,12 @@ public class ConvertingGenericData extends GenericData {
private static final TimeConversions.TimeMicrosConversion TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion();
private static final TimeConversions.TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();

// NOTE: Those are not supported in Avro 1.8.2
// TODO re-enable upon upgrading to 1.10
// private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
// private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
// private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion();
// private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion();

// NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)
// Use reflection to conditionally initialize them only if available
private static final Object TIMESTAMP_MILLIS_CONVERSION = createConversionIfAvailable("org.apache.avro.data.TimeConversions$TimestampMillisConversion");
private static final Object TIME_MILLIS_CONVERSION = createConversionIfAvailable("org.apache.avro.data.TimeConversions$TimeMillisConversion");
private static final Object LOCAL_TIMESTAMP_MILLIS_CONVERSION = createConversionIfAvailable("org.apache.avro.data.TimeConversions$LocalTimestampMillisConversion");
private static final Object LOCAL_TIMESTAMP_MICROS_CONVERSION = createConversionIfAvailable("org.apache.avro.data.TimeConversions$LocalTimestampMicrosConversion");
public static final GenericData INSTANCE = new ConvertingGenericData();

private ConvertingGenericData() {
Expand All @@ -57,12 +57,20 @@ private ConvertingGenericData() {
addLogicalTypeConversion(DATE_CONVERSION);
addLogicalTypeConversion(TIME_MICROS_CONVERSION);
addLogicalTypeConversion(TIMESTAMP_MICROS_CONVERSION);
// NOTE: Those are not supported in Avro 1.8.2
// TODO re-enable upon upgrading to 1.10
// addLogicalTypeConversion(TIME_MILLIS_CONVERSION);
// addLogicalTypeConversion(TIMESTAMP_MILLIS_CONVERSION);
// addLogicalTypeConversion(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
// addLogicalTypeConversion(LOCAL_TIMESTAMP_MICROS_CONVERSION);
// NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)
// Only add conversions if they're available
if (TIME_MILLIS_CONVERSION != null) {
addLogicalTypeConversionReflectively(TIME_MILLIS_CONVERSION);
}
if (TIMESTAMP_MILLIS_CONVERSION != null) {
addLogicalTypeConversionReflectively(TIMESTAMP_MILLIS_CONVERSION);
}
if (LOCAL_TIMESTAMP_MILLIS_CONVERSION != null) {
addLogicalTypeConversionReflectively(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
}
if (LOCAL_TIMESTAMP_MICROS_CONVERSION != null) {
addLogicalTypeConversionReflectively(LOCAL_TIMESTAMP_MICROS_CONVERSION);
}
}

@Override
Expand Down Expand Up @@ -125,9 +133,31 @@ public boolean validate(Schema schema, Object datum) {
return isInteger(datum)
|| DATE_CONVERSION.getConvertedType().isInstance(datum);
case LONG:
return isLong(datum)
|| TIME_MICROS_CONVERSION.getConvertedType().isInstance(datum)
|| TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum);
if (isLong(datum)) {
return true;
}
if (TIME_MICROS_CONVERSION.getConvertedType().isInstance(datum)
|| TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum)) {
return true;
}
// Check optional conversions that may not be available in Avro 1.8.2
Class<?> convertedType;
if (TIMESTAMP_MILLIS_CONVERSION != null
&& (convertedType = getConvertedType(TIMESTAMP_MILLIS_CONVERSION)) != null
&& convertedType.isInstance(datum)) {
return true;
}
if (LOCAL_TIMESTAMP_MICROS_CONVERSION != null
&& (convertedType = getConvertedType(LOCAL_TIMESTAMP_MICROS_CONVERSION)) != null
&& convertedType.isInstance(datum)) {
return true;
}
if (LOCAL_TIMESTAMP_MILLIS_CONVERSION != null
&& (convertedType = getConvertedType(LOCAL_TIMESTAMP_MILLIS_CONVERSION)) != null
&& convertedType.isInstance(datum)) {
return true;
}
return false;
case FLOAT:
return isFloat(datum);
case DOUBLE:
Expand All @@ -140,5 +170,43 @@ public boolean validate(Schema schema, Object datum) {
return false;
}
}

/**
* Creates a conversion instance using reflection if the class is available.
* Returns null if the class doesn't exist (e.g., in Avro 1.8.2).
*/
private static Object createConversionIfAvailable(String className) {
try {
Class<?> clazz = Class.forName(className);
Constructor<?> constructor = clazz.getConstructor();
return constructor.newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException
| IllegalAccessException | java.lang.reflect.InvocationTargetException e) {
// Class doesn't exist or can't be instantiated (e.g., Avro 1.8.2)
return null;
}
}

/**
* Gets the converted type from a conversion object using reflection.
*/
private static Class<?> getConvertedType(Object conversion) {
try {
return (Class<?>) conversion.getClass().getMethod("getConvertedType").invoke(conversion);
} catch (Exception e) {
// Should not happen if conversion is valid, but handle gracefully
return null;
}
}

/**
* Adds a logical type conversion using unchecked cast to avoid compile-time dependency
* on classes that may not exist in older Avro versions.
*/
private void addLogicalTypeConversionReflectively(Object conversion) {
// Cast to Conversion<?> since we know it's a Conversion if not null
// This avoids compile-time dependency on specific Conversion subclasses
addLogicalTypeConversion((org.apache.avro.Conversion<?>) conversion);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -48,6 +49,7 @@
import org.apache.avro.Conversions;
import org.apache.avro.Conversions.DecimalConversion;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.LogicalTypes.Decimal;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -1040,12 +1042,33 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche
case NULL:
case BOOLEAN:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BYTES:
case STRING:
return oldValue;
case LONG:
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (newSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) {
return DateTimeUtils.millisToMicros((Long) oldValue);
}
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) {
if (newSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
return DateTimeUtils.microsToMillis((Long) oldValue);
}
} else if (isLocalTimestampMillis(oldSchema.getLogicalType())) {
if (isLocalTimestampMicros(newSchema.getLogicalType())) {
return DateTimeUtils.millisToMicros((Long) oldValue);
}
} else if (isLocalTimestampMicros(oldSchema.getLogicalType())) {
if (isLocalTimestampMillis(newSchema.getLogicalType())) {
return DateTimeUtils.microsToMillis((Long) oldValue);
}
}
throw new HoodieAvroSchemaException("Long type logical change from " + oldSchema.getLogicalType() + " to " + newSchema.getLogicalType() + " is not supported");
}
return oldValue;
case FIXED:
if (oldSchema.getFixedSize() != newSchema.getFixedSize()) {
// Check whether this is a [[Decimal]]'s precision change
Expand Down Expand Up @@ -1436,4 +1459,38 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
}
}

/**
* Checks if a logical type is an instance of LocalTimestampMillis using reflection.
* Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
*/
private static boolean isLocalTimestampMillis(LogicalType logicalType) {
if (logicalType == null) {
return false;
}
try {
Class<?> localTimestampMillisClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMillis");
return localTimestampMillisClass.isInstance(logicalType);
} catch (ClassNotFoundException e) {
// Class doesn't exist (e.g., Avro 1.8.2)
return false;
}
}

/**
* Checks if a logical type is an instance of LocalTimestampMicros using reflection.
* Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
*/
private static boolean isLocalTimestampMicros(LogicalType logicalType) {
if (logicalType == null) {
return false;
}
try {
Class<?> localTimestampMicrosClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMicros");
return localTimestampMicrosClass.isInstance(logicalType);
} catch (ClassNotFoundException e) {
// Class doesn't exist (e.g., Avro 1.8.2)
return false;
}
}

}
Loading
Loading