diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml index 54f1bf4a4d12a..c8fac38be5b18 100644 --- a/hudi-flink/pom.xml +++ b/hudi-flink/pom.xml @@ -144,12 +144,6 @@ flink-hadoop-compatibility_${scala.binary.version} ${flink.version} - - org.apache.flink - flink-avro - ${flink.version} - provided - org.apache.flink flink-parquet_${scala.binary.version} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java index df815a82e4fb7..8c9537fe3c663 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java @@ -46,7 +46,7 @@ * SQL API. * *

Note: Changes in this class need to be kept in sync with the corresponding runtime classes - * {@link org.apache.flink.formats.avro.AvroRowDeserializationSchema} and {@link org.apache.flink.formats.avro.AvroRowSerializationSchema}. + * {@code org.apache.flink.formats.avro.AvroRowDeserializationSchema} and {@code org.apache.flink.formats.avro.AvroRowSerializationSchema}. * *

NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that. */ @@ -294,7 +294,7 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) { } } - private static LogicalType extractValueTypeToAvroMap(LogicalType type) { + public static LogicalType extractValueTypeToAvroMap(LogicalType type) { LogicalType keyType; LogicalType valueType; if (type instanceof MapType) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java index 1ce467f5467e4..d903632a6bec5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java @@ -49,8 +49,6 @@ import java.util.List; import java.util.Map; -import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap; - /** * Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. * @@ -188,7 +186,7 @@ private static AvroToRowDataConverter createMapConverter(LogicalType type) { final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType()); final AvroToRowDataConverter valueConverter = - createNullableConverter(extractValueTypeToAvroMap(type)); + createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type)); return avroObject -> { final Map map = (Map) avroObject; diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java index c282f5aebb812..26e21770b1680 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java @@ -39,8 +39,6 @@ import java.util.List; import java.util.Map; -import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap; - /** * Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}. * @@ -279,7 +277,7 @@ public Object convert(Schema schema, Object object) { } private static RowDataToAvroConverter createMapConverter(LogicalType type) { - LogicalType valueType = extractValueTypeToAvroMap(type); + LogicalType valueType = AvroSchemaConverter.extractValueTypeToAvroMap(type); final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType); final RowDataToAvroConverter valueConverter = createConverter(valueType); diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 7cb0e00a76669..443eb42cd3a08 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -130,7 +130,6 @@ com.esotericsoftware:kryo-shaded org.apache.flink:flink-hadoop-compatibility_${scala.binary.version} - org.apache.flink:flink-avro org.apache.flink:flink-json org.apache.flink:flink-parquet_${scala.binary.version} @@ -223,6 +222,11 @@ org.eclipse.jetty. ${flink.bundle.shade.prefix}org.apache.jetty. + + + com.esotericsoftware.kryo. + ${flink.bundle.shade.prefix}com.esotericsoftware.kryo. + @@ -329,12 +333,6 @@ ${flink.version} compile - - org.apache.flink - flink-avro - ${flink.version} - compile - org.apache.flink flink-parquet_${scala.binary.version} @@ -459,6 +457,10 @@ org.pentaho * + + com.esotericsoftware + kryo-shaded + @@ -575,12 +577,6 @@ 4.0.2 - - com.esotericsoftware.kryo - kryo - 2.24.0 - - org.apache.orc