diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index c0a4f7731689d..bc76f94e41906 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -243,7 +243,14 @@ public HoodieCleanMetadata execute() { // try to clean old history schema. try { FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(table.getMetaClient()); - fss.cleanOldFiles(pendingCleanInstants.stream().map(is -> is.getTimestamp()).collect(Collectors.toList())); + int versionsRetained = table.getConfig().getCleanerFileVersionsRetained(); + List validCommits = table + .getActiveTimeline() + .filterCompletedAndCompactionInstants() + .getInstantsAsStream() + .sorted(HoodieInstant.COMPARATOR.reversed()) + .skip(versionsRetained > 0 ? versionsRetained : 1).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + fss.cleanOldFiles(validCommits); } catch (Exception e) { // we should not affect original clean logic. Swallow exception and log warn. LOG.warn("failed to clean old history schema"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 43f40a778a072..d67614dad0c1b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -19,7 +19,6 @@ package org.apache.hudi.table.action.commit; import org.apache.avro.Schema; -import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -29,8 +28,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.collection.MappingIterator; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; @@ -150,6 +149,7 @@ private Option> composeSchemaEvolutionTra if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { // check implicitly add columns, and position reorder(spark sql may change cols order) InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema, querySchemaOpt.get()); + // get base file schema. Due to schema evolution, base file schema maybe not equal to querySchema/tableSchema long commitInstantTime = Long.parseLong(baseFile.getCommitTime()); InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient); if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) { @@ -161,6 +161,8 @@ private Option> composeSchemaEvolutionTra } } final InternalSchema writeInternalSchema = fileSchema; + // verify whether the file schema is consistent with the querySchema/tableSchema + // if not, we need rewrite base file records. List colNamesFromQuerySchema = querySchema.getAllColsFullName(); List colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName(); List sameCols = colNamesFromWriteSchema.stream() @@ -168,7 +170,7 @@ private Option> composeSchemaEvolutionTra int writerSchemaFieldId = writeInternalSchema.findIdByName(f); int querySchemaFieldId = querySchema.findIdByName(f); - return colNamesFromQuerySchema.contains(f) + return colNamesFromQuerySchema.contains(f) // check column name. && writerSchemaFieldId == querySchemaFieldId && writerSchemaFieldId != -1 && Objects.equals(writeInternalSchema.findType(writerSchemaFieldId), querySchema.findType(querySchemaFieldId)); @@ -177,9 +179,7 @@ private Option> composeSchemaEvolutionTra InternalSchema mergedSchema = new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(); Schema newWriterSchema = AvroInternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName()); - Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, newWriterSchema.getFullName()); - boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() - || SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; + boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size(); if (needToReWriteRecord) { Map renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema); return Option.of(record -> rewriteRecordWithNewSchema(record, newWriterSchema, renameCols)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java index a0e5ec22f9f5f..e14525b8667b7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; @@ -85,9 +86,12 @@ public HoodieWriteMetadata> execute() { // try to load internalSchema to support schema Evolution HoodieWriteConfig configCopy = config; - Pair, Option> schemaPair = InternalSchemaCache - .getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(), instantTime); - if (schemaPair.getLeft().isPresent() && schemaPair.getRight().isPresent()) { + boolean schemaEvolutionEnable = new TableSchemaResolver(table.getMetaClient()).getTableInternalSchemaFromCommitMetadata().isPresent(); + Pair, Option> schemaPair = Pair.of(Option.empty(), Option.empty()); + if (schemaEvolutionEnable) { + schemaPair = InternalSchemaCache.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(), instantTime); + } + if (schemaEvolutionEnable && schemaPair.getLeft().isPresent() && schemaPair.getRight().isPresent()) { // should not influence the original config, just copy it configCopy = HoodieWriteConfig.newBuilder().withProperties(config.getProps()).build(); configCopy.setInternalSchemaString(schemaPair.getLeft().get()); @@ -105,7 +109,7 @@ public HoodieWriteMetadata> execute() { metadata.addWriteStat(stat.getPartitionPath(), stat); } metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); - if (schemaPair.getLeft().isPresent()) { + if (schemaEvolutionEnable) { metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, schemaPair.getLeft().get()); metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaPair.getRight().get()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 5ac0b4cfe2c28..e423b313a1b79 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -809,12 +809,13 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr Schema.Field field = fields.get(i); String fieldName = field.name(); fieldNames.push(fieldName); - if (oldSchema.getField(field.name()) != null && !renameCols.containsKey(field.name())) { + // check rename + String fieldNameFromOldSchema = renameCols.isEmpty() ? "" : renameCols.getOrDefault(createFullName(fieldNames), ""); + + if (oldSchema.getField(field.name()) != null && fieldNameFromOldSchema.isEmpty()) { Schema.Field oldField = oldSchema.getField(field.name()); newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); } else { - String fieldFullName = createFullName(fieldNames); - String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, ""); // deal with rename if (oldSchema.getField(fieldNameFromOldSchema) != null) { // find rename diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 0b5066792f6e2..a42bbd790f2e8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -52,6 +52,9 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -67,6 +70,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -127,6 +131,8 @@ public abstract class AbstractHoodieLogRecordReader { private AtomicLong totalLogFiles = new AtomicLong(0); // Internal schema, used to support full schema evolution. private InternalSchema internalSchema; + // Historical Schemas, only used when schema evolution enabled. + private TreeMap historicalSchemas; // Hoodie table path. private final String path; // Total log blocks read - for metrics @@ -810,13 +816,23 @@ private Option> composeEvolvedSchemaTrans } long currentInstantTime = Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME)); - InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(currentInstantTime, - hoodieTableMetaClient, false); + if (historicalSchemas == null) { + FileBasedInternalSchemaStorageManager schemaStorageManager = new FileBasedInternalSchemaStorageManager(hoodieTableMetaClient); + historicalSchemas = SerDeHelper.parseSchemas(schemaStorageManager.getHistorySchemaStr()); + } + InternalSchema fileSchema; + long maxVersionId = historicalSchemas.keySet().stream().max(Long::compareTo).orElse(0L); + if (maxVersionId >= currentInstantTime) { + fileSchema = InternalSchemaUtils.searchSchema(currentInstantTime, historicalSchemas); + } else { + fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(currentInstantTime, hoodieTableMetaClient); + historicalSchemas.put(currentInstantTime, fileSchema); + } InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema, true, false).mergeSchema(); Schema mergedAvroSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, readerSchema.getFullName()); - return Option.of((record) -> rewriteRecordWithNewSchema(record, mergedAvroSchema, Collections.emptyMap())); + return Option.of((record) -> rewriteRecordWithNewSchema(record, mergedAvroSchema, InternalSchemaUtils.collectRenameCols(fileSchema, internalSchema))); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java index 5f5a8763409d2..6ddf47115399b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java @@ -81,13 +81,8 @@ public class InternalSchemaCache { * @return internalSchema */ public static InternalSchema searchSchemaAndCache(long versionID, HoodieTableMetaClient metaClient, boolean cacheEnable) { - Option candidateSchema = getSchemaByReadingCommitFile(versionID, metaClient); - if (candidateSchema.isPresent()) { - return candidateSchema.get(); - } if (!cacheEnable) { - // parse history schema and return directly - return InternalSchemaUtils.searchSchema(versionID, getHistoricalSchemas(metaClient)); + getInternalSchemaByVersionId(versionID, metaClient); } String tablePath = metaClient.getBasePath(); // use segment lock to reduce competition. @@ -117,22 +112,6 @@ private static TreeMap getHistoricalSchemas(HoodieTableMet return result; } - private static Option getSchemaByReadingCommitFile(long versionID, HoodieTableMetaClient metaClient) { - try { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - List instants = timeline.getInstantsAsStream().filter(f -> f.getTimestamp().equals(String.valueOf(versionID))).collect(Collectors.toList()); - if (instants.isEmpty()) { - return Option.empty(); - } - byte[] data = timeline.getInstantDetails(instants.get(0)).get(); - HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); - String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA); - return SerDeHelper.fromJson(latestInternalSchemaStr); - } catch (Exception e) { - throw new HoodieException("Failed to read schema from commit metadata", e); - } - } - /** * Get internalSchema and avroSchema for compaction/cluster operation. * @@ -142,24 +121,36 @@ private static Option getSchemaByReadingCommitFile(long versionI */ public static Pair, Option> getInternalSchemaAndAvroSchemaForClusteringAndCompaction(HoodieTableMetaClient metaClient, String compactionAndClusteringInstant) { // try to load internalSchema to support Schema Evolution - HoodieTimeline timelineBeforeCurrentCompaction = metaClient.getCommitsAndCompactionTimeline().findInstantsBefore(compactionAndClusteringInstant).filterCompletedInstants(); + HoodieTimeline timelineBeforeCurrentCompaction = metaClient.getActiveTimeline() + .getCommitsTimeline().filterCompletedAndCompactionInstants().findInstantsBefore(compactionAndClusteringInstant); Option lastInstantBeforeCurrentCompaction = timelineBeforeCurrentCompaction.lastInstant(); if (lastInstantBeforeCurrentCompaction.isPresent()) { // try to find internalSchema - byte[] data = timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get(); HoodieCommitMetadata metadata; try { + byte[] data = timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get(); metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); } catch (Exception e) { throw new HoodieException(String.format("cannot read metadata from commit: %s", lastInstantBeforeCurrentCompaction.get()), e); } String internalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA); - if (internalSchemaStr != null) { - String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); - return Pair.of(Option.of(internalSchemaStr), Option.of(existingSchemaStr)); + String avroSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); + if (!StringUtils.isNullOrEmpty(internalSchemaStr)) { + return Pair.of(Option.of(internalSchemaStr), Option.of(avroSchemaStr)); } + LOG.info(String.format("Schema evolution has not occurred before compaction commit: %s", compactionAndClusteringInstant)); + Schema avroSchemaWithMeta = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(avroSchemaStr)); + internalSchemaStr = SerDeHelper.toJson(AvroInternalSchemaConverter.convert(avroSchemaWithMeta)); + return Pair.of(Option.of(internalSchemaStr), Option.of(avroSchemaWithMeta.toString())); + } + LOG.warn(String.format("cannot find any valid completed commits before current cluster/compaction instantTime: %s", compactionAndClusteringInstant)); + // if a commits before compaction is cleaned, try to find internalSchema by search + Option oldInternalSchemaOpt = Option.of(InternalSchemaUtils.searchSchema(Long.parseLong(compactionAndClusteringInstant), getHistoricalSchemas(metaClient))); + if (oldInternalSchemaOpt.get().isEmptySchema()) { + throw new HoodieException(String.format("failed get internalSchema for compaction/clustering instant: %s", compactionAndClusteringInstant)); } - return Pair.of(Option.empty(), Option.empty()); + return Pair.of(oldInternalSchemaOpt.map(SerDeHelper::toJson), + oldInternalSchemaOpt.map(f -> AvroInternalSchemaConverter.convert(f, metaClient.getTableConfig().getTableName()).toString())); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java index 3cd72886c4816..08b929f8f0e59 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java @@ -53,25 +53,23 @@ public class FileBasedInternalSchemaStorageManager extends AbstractInternalSchemaStorageManager { private static final Logger LOG = LogManager.getLogger(FileBasedInternalSchemaStorageManager.class); - public static final String SCHEMA_NAME = ".schema"; private final Path baseSchemaPath; private final Configuration conf; private HoodieTableMetaClient metaClient; public FileBasedInternalSchemaStorageManager(Configuration conf, Path baseTablePath) { - Path metaPath = new Path(baseTablePath, ".hoodie"); - this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME); + Path metaPath = new Path(baseTablePath, HoodieTableMetaClient.METAFOLDER_NAME); + this.baseSchemaPath = new Path(metaPath, HoodieTableMetaClient.SCHEMA_FOLDER_NAME); this.conf = conf; } public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient metaClient) { - Path metaPath = new Path(metaClient.getBasePath(), ".hoodie"); - this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME); + Path metaPath = new Path(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME); + this.baseSchemaPath = new Path(metaPath, HoodieTableMetaClient.SCHEMA_FOLDER_NAME); this.conf = metaClient.getHadoopConf(); this.metaClient = metaClient; } - // make metaClient build lazy private HoodieTableMetaClient getMetaClient() { if (metaClient == null) { metaClient = HoodieTableMetaClient.builder().setBasePath(baseSchemaPath.getParent().getParent().toString()).setConf(conf).build(); @@ -102,7 +100,9 @@ private void cleanResidualFiles() { // clean residual files residualSchemaFiles.forEach(f -> { try { - fs.delete(new Path(getMetaClient().getSchemaFolderName(), f)); + Path residualFile = new Path(getMetaClient().getSchemaFolderName(), f); + fs.delete(residualFile); + LOG.debug(String.format("clean residual schema file: %s", residualFile)); } catch (IOException o) { throw new HoodieException(o); } @@ -116,13 +116,16 @@ private void cleanResidualFiles() { public void cleanOldFiles(List validateCommits) { try { FileSystem fs = baseSchemaPath.getFileSystem(conf); - if (fs.exists(baseSchemaPath)) { + if (fs.exists(baseSchemaPath) && validateCommits.size() > 0) { List candidateSchemaFiles = Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile()) .map(file -> file.getPath().getName()).collect(Collectors.toList()); List validateSchemaFiles = candidateSchemaFiles.stream().filter(f -> validateCommits.contains(f.split("\\.")[0])).collect(Collectors.toList()); for (int i = 0; i < validateSchemaFiles.size(); i++) { - fs.delete(new Path(validateSchemaFiles.get(i))); + Path cleanFile = new Path(getMetaClient().getSchemaFolderName(), validateSchemaFiles.get(i)); + fs.delete(cleanFile); + LOG.debug(String.format("clean schema file: %s", cleanFile)); } + LOG.debug(String.format("finish clean old history schema files: %s", String.join(",", validateSchemaFiles))); } } catch (IOException e) { throw new HoodieException(e); @@ -139,6 +142,17 @@ public String getHistorySchemaStr() { return getHistorySchemaStrByGivenValidCommits(Collections.EMPTY_LIST); } + private String readSchemaFromFile(FileSystem fs, Path filePath) throws HoodieIOException { + byte[] content; + try (FSDataInputStream is = fs.open(filePath)) { + content = FileIOUtils.readAsByteArray(is); + LOG.debug(String.format("read history schema success from file : %s", filePath)); + return new String(content, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new HoodieIOException("Could not read history schema from " + filePath, e); + } + } + @Override public String getHistorySchemaStrByGivenValidCommits(List validCommits) { List commitList = validCommits == null || validCommits.isEmpty() ? getValidInstants() : validCommits; @@ -149,21 +163,13 @@ public String getHistorySchemaStrByGivenValidCommits(List validCommits) .filter(f -> f.isFile() && f.getPath().getName().endsWith(SCHEMA_COMMIT_ACTION)) .map(file -> file.getPath().getName()).filter(f -> commitList.contains(f.split("\\.")[0])).sorted().collect(Collectors.toList()); if (!validaSchemaFiles.isEmpty()) { - Path latestFilePath = new Path(baseSchemaPath, validaSchemaFiles.get(validaSchemaFiles.size() - 1)); - byte[] content; - try (FSDataInputStream is = fs.open(latestFilePath)) { - content = FileIOUtils.readAsByteArray(is); - LOG.info(String.format("read history schema success from file : %s", latestFilePath)); - return new String(content, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new HoodieIOException("Could not read history schema from " + latestFilePath, e); - } + return readSchemaFromFile(fs, new Path(baseSchemaPath, validaSchemaFiles.get(validaSchemaFiles.size() - 1))); } } } catch (IOException io) { throw new HoodieException(io); } - LOG.info("failed to read history schema"); + LOG.warn(String.format("unable to verify the validity of historical schema files by given commits: %s", String.join(",", validCommits))); return ""; } diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/io/TestFileBasedInternalSchemaStorageManager.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/io/TestFileBasedInternalSchemaStorageManager.java index 4fbf300356c1d..e0c96e692b2ac 100644 --- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/io/TestFileBasedInternalSchemaStorageManager.java +++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/io/TestFileBasedInternalSchemaStorageManager.java @@ -34,6 +34,8 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -93,6 +95,14 @@ public void testPersistAndReadHistorySchemaStr() throws IOException { File f = new File(metaClient.getSchemaFolderName() + File.separator + "0002.schemacommit"); assertTrue(!f.exists()); assertEquals(lastSchema, fm.getSchemaByKey("3").get()); + // clean old schema files + List validCommits = metaClient + .reloadActiveTimeline() + .getCommitsTimeline() + .filterCompletedAndCompactionInstants() + .getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + fm.cleanOldFiles(validCommits); + assertTrue(fm.getHistorySchemaStr().isEmpty()); } private void simulateCommit(String commitTime) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index ed9db3a5aa424..55fe7b7288ce9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -669,4 +669,55 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } } + + test("Test non-batch read for schema change table") { + withTempDir { tmp => + Seq("MERGE_ON_READ").foreach { tableType => + val tableName = generateTableName + val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" + if (HoodieSparkUtils.gteqSpark3_1) { + val tableName = generateTableName + val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" + spark.sql("set hoodie.schema.on.read.enable=true") + spark.sql( + s"""create table $tableName + |(id int, comb int, col0 date, col1 timestamp, col5 string, col6 string, par date) + |using hudi + |location '$tablePath' + |options(type='mor', primaryKey='id', preCombineField='comb')""" + .stripMargin) + + spark.sql( + s"""insert into $tableName + |values + |(1,10,'2021-12-27','2021-12-27 21:23:00','2021-12-27','2021-12-27 12:32:00','2022-01-01'), + |(2,10,'2021-12-27','2021-12-27 21:23:00','2021-12-27','2021-12-27 12:32:00','2022-01-01'), + |(3,10,'2021-12-27','2021-12-27 21:23:00','2021-12-27','2021-12-27 12:32:00','2022-01-01'), + |(4,10,'2021-12-27','2021-12-27 21:23:00','2021-12-27','2021-12-27 12:32:00','2022-01-01'), + |(5,10,'2021-12-27','2021-12-27 21:23:00','2021-12-27','2021-12-27 12:32:00','2022-01-01'), + |(6,10,'2021-12-27','2021-12-27 21:23:00','2021-12-27','2021-12-27 12:32:00','2022-01-01'), + |(7,10,'2021-12-27','2021-12-27 21:23:00','2021-12-27','2021-12-27 12:32:00','2022-01-01'), + |(8,10,'2021-12-27','2021-12-27 21:23:00','2021-12-27','2021-12-27 12:32:00','2022-01-01'), + |(9,20,'2021-12-28','2021-12-28 21:23:00','2021-12-27','2021-12-28 12:32:00','2022-01-01')""" + .stripMargin) + spark.sql(s"alter table $tableName alter column col0 type string") + spark.sql(s"alter table $tableName alter column col5 type date") + // disable batch read + spark.sessionState.conf.setConfString("spark.sql.codegen.maxFields", "2") + spark.sessionState.conf.setConfString("spark.sql.parquet.columnarReaderBatchSize", "2") + spark.sql(s"select * from $tableName").show(false) + checkAnswer(spark.sql(s"select count(distinct(id)), count(distinct(col5)) from $tableName").collect())( + Seq(9, 1) + ) + // enable batch read + spark.sessionState.conf.setConfString("spark.sql.codegen.maxFields", "100") + spark.sessionState.conf.setConfString("spark.sql.parquet.columnarReaderBatchSize", "4096") + checkAnswer(spark.sql(s"select count(distinct(id)), count(distinct(col5)) from $tableName").collect())( + Seq(9, 1) + ) + spark.sql(s"select * from $tableName order by id").show(false) + } + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieVectorizedParquetRecordReader.java b/hudi-spark-datasource/hudi-spark3.1.x/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieVectorizedParquetRecordReader.java index d5108b94fceb0..9aeb74c1fb092 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieVectorizedParquetRecordReader.java +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieVectorizedParquetRecordReader.java @@ -55,6 +55,9 @@ public class Spark31HoodieVectorizedParquetRecordReader extends VectorizedParque // The memory mode of the columnarBatch. private final MemoryMode memoryMode; + // Need to rewrite vector + private final boolean needReWriteVectors; + /** * Batch of rows that we assemble and the current index we've returned. Every time this * batch is used up (batchIdx == numBatched), we populated the batch. @@ -73,6 +76,7 @@ public Spark31HoodieVectorizedParquetRecordReader( memoryMode = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; this.typeChangeInfos = typeChangeInfos; this.capacity = capacity; + needReWriteVectors = (typeChangeInfos != null && !typeChangeInfos.isEmpty()); } @Override @@ -112,31 +116,40 @@ public void close() throws IOException { @Override public ColumnarBatch resultBatch() { ColumnarBatch currentColumnBatch = super.resultBatch(); - boolean changed = false; + if (!needReWriteVectors) { + return currentColumnBatch; + } + + setUpColumnarBatch(); + + if (columnarBatch != null) { + return columnarBatch; + } else { + return currentColumnBatch; + } + } + + private void setUpColumnarBatch() { + ColumnarBatch currentColumnBatch = super.resultBatch(); + for (Map.Entry> entry : typeChangeInfos.entrySet()) { boolean rewrite = SparkInternalSchemaConverter .convertColumnVectorType((WritableColumnVector) currentColumnBatch.column(entry.getKey()), idToColumnVectors.get(entry.getKey()), currentColumnBatch.numRows()); if (rewrite) { - changed = true; columnVectors[entry.getKey()] = idToColumnVectors.get(entry.getKey()); } } - if (changed) { - if (columnarBatch == null) { - // fill other vector - for (int i = 0; i < columnVectors.length; i++) { - if (columnVectors[i] == null) { - columnVectors[i] = (WritableColumnVector) currentColumnBatch.column(i); - } + if (columnarBatch == null) { + // fill other vector + for (int i = 0; i < columnVectors.length; i++) { + if (columnVectors[i] == null) { + columnVectors[i] = (WritableColumnVector) currentColumnBatch.column(i); } - columnarBatch = new ColumnarBatch(columnVectors); } - columnarBatch.setNumRows(currentColumnBatch.numRows()); - return columnarBatch; - } else { - return currentColumnBatch; + columnarBatch = new ColumnarBatch(columnVectors); } + columnarBatch.setNumRows(currentColumnBatch.numRows()); } @Override @@ -145,6 +158,7 @@ public boolean nextBatch() throws IOException { if (idToColumnVectors != null) { idToColumnVectors.entrySet().stream().forEach(e -> e.getValue().reset()); } + // Trigger vector rewrite numBatched = resultBatch().numRows(); batchIdx = 0; return result; @@ -158,20 +172,22 @@ public void enableReturningBatches() { @Override public Object getCurrentValue() { - if (typeChangeInfos == null || typeChangeInfos.isEmpty()) { + if (!needReWriteVectors || columnarBatch == null) { return super.getCurrentValue(); } - - if (returnColumnarBatch) { - return columnarBatch == null ? super.getCurrentValue() : columnarBatch; - } - - return columnarBatch == null ? super.getCurrentValue() : columnarBatch.getRow(batchIdx - 1); + return returnColumnarBatch ? columnarBatch : columnarBatch.getRow(batchIdx - 1); } @Override public boolean nextKeyValue() throws IOException { - resultBatch(); + // Trigger to create first columnarBatch + if (columnarBatch == null) { + resultBatch(); + } + + if (!needReWriteVectors || columnarBatch == null) { + return super.nextKeyValue(); + } if (returnColumnarBatch) { return nextBatch(); diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieVectorizedParquetRecordReader.java b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieVectorizedParquetRecordReader.java index 6ce054c5955f3..68b7a77505778 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieVectorizedParquetRecordReader.java +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieVectorizedParquetRecordReader.java @@ -55,6 +55,9 @@ public class Spark32PlusHoodieVectorizedParquetRecordReader extends VectorizedPa // The memory mode of the columnarBatch. private final MemoryMode memoryMode; + // Need to rewrite vector + private final boolean needReWriteVectors; + /** * Batch of rows that we assemble and the current index we've returned. Every time this * batch is used up (batchIdx == numBatched), we populated the batch. @@ -75,6 +78,7 @@ public Spark32PlusHoodieVectorizedParquetRecordReader( memoryMode = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; this.typeChangeInfos = typeChangeInfos; this.capacity = capacity; + needReWriteVectors = (typeChangeInfos != null && !typeChangeInfos.isEmpty()); } @Override @@ -114,31 +118,40 @@ public void close() throws IOException { @Override public ColumnarBatch resultBatch() { ColumnarBatch currentColumnBatch = super.resultBatch(); - boolean changed = false; + if (!needReWriteVectors) { + return currentColumnBatch; + } + + setUpColumnarBatch(); + + if (columnarBatch != null) { + return columnarBatch; + } else { + return currentColumnBatch; + } + } + + private void setUpColumnarBatch() { + ColumnarBatch currentColumnBatch = super.resultBatch(); + for (Map.Entry> entry : typeChangeInfos.entrySet()) { boolean rewrite = SparkInternalSchemaConverter .convertColumnVectorType((WritableColumnVector) currentColumnBatch.column(entry.getKey()), idToColumnVectors.get(entry.getKey()), currentColumnBatch.numRows()); if (rewrite) { - changed = true; columnVectors[entry.getKey()] = idToColumnVectors.get(entry.getKey()); } } - if (changed) { - if (columnarBatch == null) { - // fill other vector - for (int i = 0; i < columnVectors.length; i++) { - if (columnVectors[i] == null) { - columnVectors[i] = (WritableColumnVector) currentColumnBatch.column(i); - } + if (columnarBatch == null) { + // fill other vector + for (int i = 0; i < columnVectors.length; i++) { + if (columnVectors[i] == null) { + columnVectors[i] = (WritableColumnVector) currentColumnBatch.column(i); } - columnarBatch = new ColumnarBatch(columnVectors); } - columnarBatch.setNumRows(currentColumnBatch.numRows()); - return columnarBatch; - } else { - return currentColumnBatch; + columnarBatch = new ColumnarBatch(columnVectors); } + columnarBatch.setNumRows(currentColumnBatch.numRows()); } @Override @@ -147,6 +160,7 @@ public boolean nextBatch() throws IOException { if (idToColumnVectors != null) { idToColumnVectors.entrySet().stream().forEach(e -> e.getValue().reset()); } + // Trigger vector rewrite numBatched = resultBatch().numRows(); batchIdx = 0; return result; @@ -160,20 +174,22 @@ public void enableReturningBatches() { @Override public Object getCurrentValue() { - if (typeChangeInfos == null || typeChangeInfos.isEmpty()) { + if (!needReWriteVectors || columnarBatch == null) { return super.getCurrentValue(); } - - if (returnColumnarBatch) { - return columnarBatch == null ? super.getCurrentValue() : columnarBatch; - } - - return columnarBatch == null ? super.getCurrentValue() : columnarBatch.getRow(batchIdx - 1); + return returnColumnarBatch ? columnarBatch : columnarBatch.getRow(batchIdx - 1); } @Override public boolean nextKeyValue() throws IOException { - resultBatch(); + // Trigger to create first columnarBatch + if (columnarBatch == null) { + resultBatch(); + } + + if (!needReWriteVectors || columnarBatch == null) { + return super.nextKeyValue(); + } if (returnColumnarBatch) { return nextBatch();