diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java index 1d2786197780c..d8f79e9f6bce1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java @@ -100,7 +100,7 @@ public static InternalSchema searchSchemaAndCache(long versionID, HoodieTableMet } } - private static TreeMap getHistoricalSchemas(HoodieTableMetaClient metaClient) { + public static TreeMap getHistoricalSchemas(HoodieTableMetaClient metaClient) { TreeMap result = new TreeMap<>(); FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient); String historySchemaStr = schemasManager.getHistorySchemaStr(); diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml index 97288d19cd35c..c3a14007c2f43 100644 --- a/hudi-flink-datasource/hudi-flink/pom.xml +++ b/hudi-flink-datasource/hudi-flink/pom.xml @@ -265,6 +265,64 @@ + + + + org.apache.spark + spark-core_${scala.binary.version} + + + javax.servlet + * + + + tests + test + + + org.apache.spark + spark-sql_${scala.binary.version} + + + org.eclipse.jetty.orbit + javax.servlet + + + javax.servlet + javax.servlet-api + + + com.twitter + chill-java + + + test + + + org.apache.spark + spark-hive_${scala.binary.version} + test + + + org.apache.hudi + hudi-spark_${scala.binary.version} + ${project.version} + test + + + org.apache.hudi + hudi-spark_${scala.binary.version} + ${project.version} + test + test-jar + + + org.scalatest + scalatest_${scala.binary.version} + ${scalatest.version} + test + + org.junit.jupiter diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 8e202c692383d..0258de670cf88 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -89,6 +89,12 @@ private FlinkOptions() { + "The semantics is best effort because the compaction job would finally merge all changes of a record into one.\n" + " default false to have UPSERT semantics"); + public static final ConfigOption SCHEMA_EVOLUTION_ENABLED = ConfigOptions + .key(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key()) + .booleanType() + .defaultValue(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()) + .withDescription(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.doc()); + // ------------------------------------------------------------------------ // Metadata table Options // ------------------------------------------------------------------------ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index d00eb3e3ec700..84abbb126f722 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -25,16 +25,20 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieROTablePathFilter; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.hudi.source.FileIndex; import org.apache.hudi.source.IncrementalInputSplits; import org.apache.hudi.source.StreamReadMonitoringFunction; import org.apache.hudi.source.StreamReadOperator; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.table.format.SchemaEvoContext; import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; @@ -88,6 +92,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -436,7 +441,8 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value getParquetConf(this.conf, this.hadoopConf), - this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) + this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), + getSchemaEvoContext() ); format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); return format; @@ -447,6 +453,17 @@ private Schema inferSchemaFromDdl() { return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); } + private Option getSchemaEvoContext() { + if (!conf.getBoolean(FlinkOptions.SCHEMA_EVOLUTION_ENABLED) || metaClient == null) { + return Option.empty(); + } + TreeMap schemas = InternalSchemaCache.getHistoricalSchemas(metaClient); + InternalSchema querySchema = schemas.isEmpty() + ? AvroInternalSchemaConverter.convert(getTableAvroSchema()) + : schemas.lastEntry().getValue(); + return Option.of(new SchemaEvoContext(querySchema, metaClient)); + } + @VisibleForTesting public Schema getTableAvroSchema() { try { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java new file mode 100644 index 0000000000000..9ad6de94c09eb --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.avro.Schema; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.util.Preconditions; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.util.AvroSchemaConverter; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; + +/** + * CastMap is responsible for type conversion when full schema evolution enabled. + */ +public final class CastMap implements Serializable { + // Maps position (column number) to corresponding cast + private final Map castMap = new HashMap<>(); + + /** + * Creates CastMap by comparing two schemes. Cast of a specific column is created if its type has changed. + */ + public static CastMap of(String tableName, InternalSchema querySchema, InternalSchema actualSchema) { + DataType queryType = internalSchemaToDataType(tableName, querySchema); + DataType actualType = internalSchemaToDataType(tableName, actualSchema); + CastMap castMap = new CastMap(); + InternalSchemaUtils.collectTypeChangedCols(querySchema, actualSchema).entrySet() + .stream() + .filter(e -> !isSameType(e.getValue().getLeft(), e.getValue().getRight())) + .forEach(e -> { + int pos = e.getKey(); + LogicalType target = queryType.getChildren().get(pos).getLogicalType(); + LogicalType actual = actualType.getChildren().get(pos).getLogicalType(); + castMap.add(pos, actual, target); + }); + return castMap; + } + + public Object castIfNeed(int pos, Object val) { + Cast cast = castMap.get(pos); + if (cast == null) { + return val; + } + return cast(val, cast.from(), cast.to()); + } + + private Object cast(Object val, LogicalType fromType, LogicalType toType) { + LogicalTypeRoot from = fromType.getTypeRoot(); + LogicalTypeRoot to = toType.getTypeRoot(); + switch (to) { + case BIGINT: { + // Integer => Long + if (from == INTEGER) { + return ((Number) val).longValue(); + } + break; + } + case FLOAT: { + // Integer => Float + // Long => Float + if (from == INTEGER || from == BIGINT) { + return ((Number) val).floatValue(); + } + break; + } + case DOUBLE: { + // Integer => Double + // Long => Double + if (from == INTEGER || from == BIGINT) { + return ((Number) val).doubleValue(); + } + // Float => Double + if (from == FLOAT) { + return Double.parseDouble(val.toString()); + } + break; + } + case DECIMAL: { + // Integer => Decimal + // Long => Decimal + // Double => Decimal + if (from == INTEGER || from == BIGINT || from == DOUBLE) { + return toDecimalData((Number) val, toType); + } + // Float => Decimal + if (from == FLOAT) { + return toDecimalData(Double.parseDouble(val.toString()), toType); + } + // String => Decimal + if (from == VARCHAR) { + return toDecimalData(Double.parseDouble(val.toString()), toType); + } + // Decimal => Decimal + if (from == DECIMAL) { + return toDecimalData(((DecimalData) val).toBigDecimal(), toType); + } + break; + } + case VARCHAR: { + // Integer => String + // Long => String + // Float => String + // Double => String + // Decimal => String + if (from == INTEGER + || from == BIGINT + || from == FLOAT + || from == DOUBLE + || from == DECIMAL) { + return new BinaryStringData(String.valueOf(val)); + } + // Date => String + if (from == DATE) { + return new BinaryStringData(LocalDate.ofEpochDay(((Integer) val).longValue()).toString()); + } + break; + } + case DATE: { + // String => Date + if (from == VARCHAR) { + return (int) LocalDate.parse(val.toString()).toEpochDay(); + } + break; + } + default: + } + return val; + } + + public boolean containsAnyPos(Collection positions) { + return positions.stream().anyMatch(castMap.keySet()::contains); + } + + public CastMap rearrange(List oldIndexes, List newIndexes) { + Preconditions.checkArgument(oldIndexes.size() == newIndexes.size()); + CastMap newCastMap = new CastMap(); + for (int i = 0; i < oldIndexes.size(); i++) { + Cast cast = castMap.get(oldIndexes.get(i)); + if (cast != null) { + newCastMap.add(newIndexes.get(i), cast.from(), cast.to()); + } + } + return newCastMap; + } + + @VisibleForTesting + void add(int pos, LogicalType from, LogicalType to) { + castMap.put(pos, new Cast(from, to)); + } + + private DecimalData toDecimalData(Number val, LogicalType decimalType) { + BigDecimal valAsDecimal = BigDecimal.valueOf(val.doubleValue()); + return toDecimalData(valAsDecimal, decimalType); + } + + private DecimalData toDecimalData(BigDecimal valAsDecimal, LogicalType decimalType) { + return DecimalData.fromBigDecimal( + valAsDecimal, + ((DecimalType) decimalType).getPrecision(), + ((DecimalType) decimalType).getScale()); + } + + private static boolean isSameType(Type left, Type right) { + if (left instanceof Types.DecimalType && right instanceof Types.DecimalType) { + return left.equals(right); + } + return left.typeId().equals(right.typeId()); + } + + private static DataType internalSchemaToDataType(String tableName, InternalSchema internalSchema) { + Schema schema = AvroInternalSchemaConverter.convert(internalSchema, tableName); + return AvroSchemaConverter.convertToDataType(schema); + } + + private static final class Cast implements Serializable { + private final LogicalType from; + private final LogicalType to; + + Cast(LogicalType from, LogicalType to) { + this.from = from; + this.to = to; + } + + LogicalType from() { + return from; + } + + LogicalType to() { + return to; + } + + @Override + public String toString() { + return from + " => " + to; + } + } + + @Override + public String toString() { + return castMap.entrySet().stream() + .map(e -> e.getKey() + ": " + e.getValue()) + .collect(Collectors.joining(", ", "{", "}")); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvoContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvoContext.java new file mode 100644 index 0000000000000..7e0ca29ad0408 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvoContext.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.RowDataCastProjection; +import org.apache.hudi.util.RowDataProjection; + +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS; + +/** + * This class is responsible for calculating names and types of fields that are actual at a certain point in time. + * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. + * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. + */ +public final class SchemaEvoContext implements Serializable { + private final InternalSchema querySchema; + private final HoodieTableMetaClient metaClient; + private List fieldNames; + private List fieldTypes; + private RowDataProjection projection; + + public SchemaEvoContext(InternalSchema querySchema, HoodieTableMetaClient metaClient) { + this.querySchema = querySchema; + this.metaClient = metaClient; + } + + public void evalActualFields(String fileName, int[] selectedFields) { + InternalSchema mergedSchema = getMergedSchema(fileName); + List fieldTypesWithMeta = AvroSchemaConverter.convertToDataType(AvroInternalSchemaConverter.convert(mergedSchema, tableName())).getChildren(); + List selectedFieldsList = getSelectedFields(selectedFields); + RowDataProjection projection = getProjection(mergedSchema, fieldTypesWithMeta, selectedFieldsList); + List fieldNamesWithMeta = mergedSchema.columns().stream().map(Types.Field::name).collect(Collectors.toList()); + setActualFields(fieldNamesWithMeta, fieldTypesWithMeta, projection); + } + + public String[] fieldNames() { + return fieldNames.toArray(new String[0]); + } + + public DataType[] fieldTypes() { + return fieldTypes.toArray(new DataType[0]); + } + + public Option projection() { + return Option.ofNullable(projection); + } + + private List getSelectedFields(int[] selectedFields) { + return Arrays.stream(selectedFields) + .boxed() + .map(pos -> pos + HOODIE_META_COLUMNS.size()) + .collect(Collectors.toList()); + } + + private InternalSchema getMergedSchema(String fileName) { + long commitTime = Long.parseLong(FSUtils.getCommitTime(fileName)); + InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false); + return new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema(); + } + + private RowDataProjection getProjection(InternalSchema mergedSchema, List actualFieldTypesWithMeta, List selectedFields) { + CastMap castMap = CastMap.of(tableName(), querySchema, mergedSchema); + if (castMap.containsAnyPos(selectedFields)) { + List readType = new ArrayList<>(selectedFields.size()); + for (int pos : selectedFields) { + readType.add(actualFieldTypesWithMeta.get(pos).getLogicalType()); + } + return new RowDataCastProjection( + readType.toArray(new LogicalType[0]), + IntStream.range(0, selectedFields.size()).toArray(), + castMap.rearrange(selectedFields, IntStream.range(0, selectedFields.size()).boxed().collect(Collectors.toList())) + ); + } + return null; + } + + private void setActualFields(List fieldNames, List fieldTypes, RowDataProjection projection) { + this.fieldNames = fieldNames.subList(HOODIE_META_COLUMNS.size(), fieldNames.size()); + this.fieldTypes = fieldTypes.subList(HOODIE_META_COLUMNS.size(), fieldTypes.size()); + this.projection = projection; + } + + private String tableName() { + return metaClient.getTableConfig().getTableName(); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index d0d37469349bd..6f75c68f7035d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -19,6 +19,8 @@ package org.apache.hudi.table.format.cow; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.table.format.SchemaEvoContext; import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.flink.api.common.io.FileInputFormat; @@ -28,6 +30,7 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.utils.SerializableConfiguration; +import org.apache.flink.table.data.ColumnarRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.PartitionPathUtils; @@ -35,6 +38,7 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.util.RowDataProjection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +79,8 @@ public class CopyOnWriteInputFormat extends FileInputFormat { private final boolean utcTimestamp; private final SerializableConfiguration conf; private final long limit; + private final Option schemaEvoContext; + private Option projection; private transient ParquetColumnarRowSplitReader reader; private transient long currentReadCount; @@ -92,7 +98,8 @@ public CopyOnWriteInputFormat( String partDefaultName, long limit, Configuration conf, - boolean utcTimestamp) { + boolean utcTimestamp, + Option schemaEvoContext) { super.setFilePaths(paths); this.limit = limit; this.partDefaultName = partDefaultName; @@ -101,10 +108,22 @@ public CopyOnWriteInputFormat( this.selectedFields = selectedFields; this.conf = new SerializableConfiguration(conf); this.utcTimestamp = utcTimestamp; + this.schemaEvoContext = schemaEvoContext; } @Override public void open(FileInputSplit fileSplit) throws IOException { + String[] actualFieldNames = fullFieldNames; + DataType[] actualFieldTypes = fullFieldTypes; + if (schemaEvoContext.isPresent()) { + schemaEvoContext.get().evalActualFields(fileSplit.getPath().getName(), selectedFields); + actualFieldNames = schemaEvoContext.get().fieldNames(); + actualFieldTypes = schemaEvoContext.get().fieldTypes(); + projection = schemaEvoContext.get().projection(); + } else { + projection = Option.empty(); + } + // generate partition specs. List fieldNameList = Arrays.asList(fullFieldNames); LinkedHashMap partSpec = PartitionPathUtils.extractPartitionSpecFromPath( @@ -118,8 +137,8 @@ public void open(FileInputSplit fileSplit) throws IOException { utcTimestamp, true, conf.conf(), - fullFieldNames, - fullFieldTypes, + actualFieldNames, + actualFieldTypes, partObjects, selectedFields, DEFAULT_SIZE, @@ -278,7 +297,11 @@ public boolean reachedEnd() throws IOException { @Override public RowData nextRecord(RowData reuse) { currentReadCount++; - return reader.nextRecord(); + ColumnarRowData rowData = reader.nextRecord(); + if (projection.isPresent()) { + return projection.get().project(rowData); + } + return rowData; } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java new file mode 100644 index 0000000000000..880e44f855b75 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.table.format.CastMap; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +/** + * As well as {@link RowDataProjection} projects the row data. + * In addition, fields are converted according to the CastMap. + */ +public final class RowDataCastProjection extends RowDataProjection { + private static final long serialVersionUID = 1L; + + private final CastMap castMap; + + public RowDataCastProjection(LogicalType[] types, int[] positions, CastMap castMap) { + super(types, positions); + this.castMap = castMap; + } + + @Override + public RowData project(RowData rowData) { + RowData.FieldGetter[] fields = fieldGetters(); + GenericRowData genericRowData = new GenericRowData(fields.length); + for (int i = 0; i < fields.length; i++) { + Object val = fields[i].getFieldOrNull(rowData); + if (val != null) { + val = castMap.castIfNeed(i, val); + } + genericRowData.setField(i, val); + } + return genericRowData; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java index 048f286fa0eca..6d0a84a2c6900 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -35,7 +35,7 @@ public class RowDataProjection implements Serializable { private final RowData.FieldGetter[] fieldGetters; - private RowDataProjection(LogicalType[] types, int[] positions) { + protected RowDataProjection(LogicalType[] types, int[] positions) { ValidationUtils.checkArgument(types.length == positions.length, "types and positions should have the equal number"); this.fieldGetters = new RowData.FieldGetter[types.length]; @@ -78,4 +78,8 @@ public Object[] projectAsValues(RowData rowData) { } return values; } + + protected RowData.FieldGetter[] fieldGetters() { + return fieldGetters; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/schemaevo/ITTestReadWithSchemaEvo.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/schemaevo/ITTestReadWithSchemaEvo.java new file mode 100644 index 0000000000000..a228acdbe022f --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/schemaevo/ITTestReadWithSchemaEvo.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.schemaevo; + +import org.apache.flink.table.api.TableResult; +import org.apache.spark.sql.SparkSession; + +import java.util.concurrent.ExecutionException; + +import static org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE; + +/** + * Tests of reading when schema evolution enabled. + */ +@SuppressWarnings({"SqlNoDataSourceInspection", "SqlDialectInspection"}) +public class ITTestReadWithSchemaEvo extends TestSchemaEvoBase { + + @TestWhenSparkGreaterThan31 + public void testReadSnapshotBatchCOW() throws ExecutionException, InterruptedException { + String tablePath = tempFile.getAbsolutePath(); + try (SparkSession spark = spark()) { + spark.sql(String.format("set %s=true", SCHEMA_EVOLUTION_ENABLE.key())); + createAndPreparePartitionTable(spark, "t1", tablePath, "cow"); + spark.sql("alter table t1 add columns(newCol1 boolean after col4)"); + spark.sql("alter table t1 rename column col1 to renamedCol1"); + spark.sql("alter table t1 drop column col3"); + spark.sql("alter table t1 alter column col6 type string"); + spark.sql("alter table t1 add columns(newCol2 int after col8)"); + spark.sql("insert into t1 values (1,1,11,100001,101.01,100001.0001,true,'a000001','date->string','2021-12-25 12:01:01',true,10,'a01','2021-12-25')"); + } + //language=SQL + tEnv.executeSql( + "create table t1 (" + + " id int," + + " comb int," + + " col0 int," + + " renamedCol1 bigint," + + " col2 float," + + " col4 decimal(10,4)," + + " newCol1 boolean," + + " col5 string," + + " col6 string," + + " col7 timestamp," + + " col8 boolean," + + " newCol2 int," + + " col9 binary," + + " par date" + + " )" + + " partitioned by (par) with (" + + " 'connector' = 'hudi'," + + " 'path' = '" + tablePath + "'," + + " 'table.type' = 'COPY_ON_WRITE'," + + " 'read.streaming.enabled' = 'false'," + + " 'read.tasks' = '1'," + + " 'hoodie.datasource.query.type' = 'snapshot'," + + " 'hoodie.datasource.write.recordkey.field' = 'id'," + + " 'hoodie.datasource.write.hive_style_partitioning' = 'true'," + + " 'hoodie.datasource.write.keygenerator.type' = 'COMPLEX'," + + " 'hoodie.schema.on.read.enable' = 'true'" + + ")" + ).await(); + TableResult tableResult = tEnv.executeSql("select par, newCol2, col6, newCol1, col4, renamedCol1, id from t1"); + checkAnswer( + tableResult, + "+I[2021-12-25, 10, date->string, true, 100001.0001, 100001, 1]", + "+I[2021-12-25, null, 2021-12-25, null, 100002.0002, 100002, 2]", + "+I[2021-12-25, null, 2021-12-25, null, 100003.0003, 100003, 3]", + "+I[2021-12-26, null, 2021-12-26, null, 100004.0004, 100004, 4]", + "+I[2021-12-26, null, 2021-12-26, null, 100005.0005, 100005, 5]" + ); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/schemaevo/TestSchemaEvoBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/schemaevo/TestSchemaEvoBase.java new file mode 100644 index 0000000000000..9bcc118dc236e --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/schemaevo/TestSchemaEvoBase.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.schemaevo; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.hudi.HoodieSparkUtils; +import org.apache.spark.sql.hudi.TestSpark3DDL; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.platform.commons.util.AnnotationUtils.findAnnotation; + +public class TestSchemaEvoBase extends TestSpark3DDL { + + @TempDir + File tempFile; + + StreamExecutionEnvironment env; + StreamTableEnvironment tEnv; + + @BeforeEach + public void setUp() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + tEnv = StreamTableEnvironment.create(env); + } + + public void checkAnswer(TableResult actualResult, String... expectedResult) { + try (CloseableIterator iterator = actualResult.collect()) { + Set expected = new HashSet<>(Arrays.asList(expectedResult)); + Set actual = new HashSet<>(expected.size()); + for (int i = 0; i < expected.size() && iterator.hasNext(); i++) { + actual.add(iterator.next().toString()); + } + assertEquals(expected, actual); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Retention(RetentionPolicy.RUNTIME) + @ExtendWith(ITTestReadWithSchemaEvo.WhenSparkGreaterThan31.class) + @Test + public @interface TestWhenSparkGreaterThan31 {} + + public static final class WhenSparkGreaterThan31 implements ExecutionCondition { + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + java.util.Optional annotation = findAnnotation(context.getElement(), ITTestReadWithSchemaEvo.TestWhenSparkGreaterThan31.class); + if (annotation.isPresent() && !HoodieSparkUtils.gteqSpark3_1()) { + return ConditionEvaluationResult.disabled("Spark version should be greater than 3.1"); + } + return ConditionEvaluationResult.enabled("OK"); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java new file mode 100644 index 0000000000000..17553ce7ded53 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; + +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for {@link CastMap}. + */ +public class TestCastMap { + + @Test + public void testCastInt() { + CastMap castMap = new CastMap(); + castMap.add(0, new IntType(), new BigIntType()); + castMap.add(1, new IntType(), new FloatType()); + castMap.add(2, new IntType(), new DoubleType()); + castMap.add(3, new IntType(), new DecimalType()); + castMap.add(4, new IntType(), new VarCharType()); + int val = 1; + assertEquals(1L, castMap.castIfNeed(0, val)); + assertEquals(1.0F, castMap.castIfNeed(1, val)); + assertEquals(1.0, castMap.castIfNeed(2, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeed(3, val)); + assertEquals(BinaryStringData.fromString("1"), castMap.castIfNeed(4, val)); + } + + @Test + public void testCastLong() { + CastMap castMap = new CastMap(); + castMap.add(0, new BigIntType(), new FloatType()); + castMap.add(1, new BigIntType(), new DoubleType()); + castMap.add(2, new BigIntType(), new DecimalType()); + castMap.add(3, new BigIntType(), new VarCharType()); + long val = 1L; + assertEquals(1.0F, castMap.castIfNeed(0, val)); + assertEquals(1.0, castMap.castIfNeed(1, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeed(2, val)); + assertEquals(BinaryStringData.fromString("1"), castMap.castIfNeed(3, val)); + } + + @Test + public void testCastFloat() { + CastMap castMap = new CastMap(); + castMap.add(0, new FloatType(), new DoubleType()); + castMap.add(1, new FloatType(), new DecimalType()); + castMap.add(2, new FloatType(), new VarCharType()); + float val = 1F; + assertEquals(1.0, castMap.castIfNeed(0, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeed(1, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeed(2, val)); + } + + @Test + public void testCastDouble() { + CastMap castMap = new CastMap(); + castMap.add(0, new DoubleType(), new DecimalType()); + castMap.add(1, new DoubleType(), new VarCharType()); + double val = 1; + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeed(0, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeed(1, val)); + } + + @Test + public void testCastDecimal() { + CastMap castMap = new CastMap(); + castMap.add(0, new DecimalType(2, 1), new DecimalType(3, 2)); + castMap.add(1, new DecimalType(), new VarCharType()); + DecimalData val = DecimalData.fromBigDecimal(BigDecimal.ONE, 2, 1); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 3, 2), castMap.castIfNeed(0, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeed(1, val)); + } + + @Test + public void testCastString() { + CastMap castMap = new CastMap(); + castMap.add(0, new VarCharType(), new DecimalType()); + castMap.add(1, new VarCharType(), new DateType()); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeed(0, BinaryStringData.fromString("1.0"))); + assertEquals((int) LocalDate.parse("2022-05-12").toEpochDay(), castMap.castIfNeed(1, BinaryStringData.fromString("2022-05-12"))); + } + + @Test + public void testCastDate() { + CastMap castMap = new CastMap(); + castMap.add(0, new DateType(), new VarCharType()); + assertEquals(BinaryStringData.fromString("2022-05-12"), castMap.castIfNeed(0, (int) LocalDate.parse("2022-05-12").toEpochDay())); + } +}