Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,14 @@ public HoodieCleanMetadata execute() {
// try to clean old history schema.
try {
FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
fss.cleanOldFiles(pendingCleanInstants.stream().map(is -> is.getTimestamp()).collect(Collectors.toList()));
int versionsRetained = table.getConfig().getCleanerFileVersionsRetained();
List<String> validCommits = table
.getActiveTimeline()
.filterCompletedAndCompactionInstants()
.getInstantsAsStream()
.sorted(HoodieInstant.COMPARATOR.reversed())
.skip(versionsRetained > 0 ? versionsRetained : 1).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
fss.cleanOldFiles(validCommits);
} catch (Exception e) {
// we should not affect original clean logic. Swallow exception and log warn.
LOG.warn("failed to clean old history schema");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.table.action.commit;

import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -29,8 +28,8 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.collection.MappingIterator;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -150,6 +149,7 @@ private Option<Function<GenericRecord, GenericRecord>> composeSchemaEvolutionTra
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema, querySchemaOpt.get());
// get base file schema. Due to schema evolution, base file schema maybe not equal to querySchema/tableSchema
long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
Expand All @@ -161,14 +161,16 @@ private Option<Function<GenericRecord, GenericRecord>> composeSchemaEvolutionTra
}
}
final InternalSchema writeInternalSchema = fileSchema;
// verify whether the file schema is consistent with the querySchema/tableSchema
// if not, we need rewrite base file records.
List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
List<String> colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName();
List<String> sameCols = colNamesFromWriteSchema.stream()
.filter(f -> {
int writerSchemaFieldId = writeInternalSchema.findIdByName(f);
int querySchemaFieldId = querySchema.findIdByName(f);

return colNamesFromQuerySchema.contains(f)
return colNamesFromQuerySchema.contains(f) // check column name.
&& writerSchemaFieldId == querySchemaFieldId
&& writerSchemaFieldId != -1
&& Objects.equals(writeInternalSchema.findType(writerSchemaFieldId), querySchema.findType(querySchemaFieldId));
Expand All @@ -177,9 +179,7 @@ private Option<Function<GenericRecord, GenericRecord>> composeSchemaEvolutionTra
InternalSchema mergedSchema = new InternalSchemaMerger(writeInternalSchema, querySchema,
true, false, false).mergeSchema();
Schema newWriterSchema = AvroInternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName());
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, newWriterSchema.getFullName());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove useless check

boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
|| SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size();
if (needToReWriteRecord) {
Map<String, String> renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
return Option.of(record -> rewriteRecordWithNewSchema(record, newWriterSchema, renameCols));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -85,9 +86,12 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {

// try to load internalSchema to support schema Evolution
HoodieWriteConfig configCopy = config;
Pair<Option<String>, Option<String>> schemaPair = InternalSchemaCache
.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(), instantTime);
if (schemaPair.getLeft().isPresent() && schemaPair.getRight().isPresent()) {
boolean schemaEvolutionEnable = new TableSchemaResolver(table.getMetaClient()).getTableInternalSchemaFromCommitMetadata().isPresent();
Pair<Option<String>, Option<String>> schemaPair = Pair.of(Option.empty(), Option.empty());
if (schemaEvolutionEnable) {
schemaPair = InternalSchemaCache.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(), instantTime);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize the code, trigger the corresponding logic only when the schema evolution is enabled

}
if (schemaEvolutionEnable && schemaPair.getLeft().isPresent() && schemaPair.getRight().isPresent()) {
// should not influence the original config, just copy it
configCopy = HoodieWriteConfig.newBuilder().withProperties(config.getProps()).build();
configCopy.setInternalSchemaString(schemaPair.getLeft().get());
Expand All @@ -105,7 +109,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
if (schemaPair.getLeft().isPresent()) {
if (schemaEvolutionEnable) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, schemaPair.getLeft().get());
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaPair.getRight().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,12 +809,13 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
Schema.Field field = fields.get(i);
String fieldName = field.name();
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null && !renameCols.containsKey(field.name())) {
// check rename
String fieldNameFromOldSchema = renameCols.isEmpty() ? "" : renameCols.getOrDefault(createFullName(fieldNames), "");

if (oldSchema.getField(field.name()) != null && fieldNameFromOldSchema.isEmpty()) {
Schema.Field oldField = oldSchema.getField(field.name());
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
String fieldFullName = createFullName(fieldNames);
String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
// deal with rename
if (oldSchema.getField(fieldNameFromOldSchema) != null) {
// find rename
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -67,6 +70,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -127,6 +131,8 @@ public abstract class AbstractHoodieLogRecordReader {
private AtomicLong totalLogFiles = new AtomicLong(0);
// Internal schema, used to support full schema evolution.
private InternalSchema internalSchema;
// Historical Schemas, only used when schema evolution enabled.
private TreeMap<Long, InternalSchema> historicalSchemas;
// Hoodie table path.
private final String path;
// Total log blocks read - for metrics
Expand Down Expand Up @@ -810,13 +816,23 @@ private Option<Function<IndexedRecord, IndexedRecord>> composeEvolvedSchemaTrans
}

long currentInstantTime = Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
hoodieTableMetaClient, false);
if (historicalSchemas == null) {
FileBasedInternalSchemaStorageManager schemaStorageManager = new FileBasedInternalSchemaStorageManager(hoodieTableMetaClient);
historicalSchemas = SerDeHelper.parseSchemas(schemaStorageManager.getHistorySchemaStr());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache historical schema,reduce the overhead of search fileSchema.
in our env. we have a log with 1700+ avroBlock,
In original logic, it is very time-consuming to do 1700 fileSchema lookup operations

}
InternalSchema fileSchema;
long maxVersionId = historicalSchemas.keySet().stream().max(Long::compareTo).orElse(0L);
if (maxVersionId >= currentInstantTime) {
fileSchema = InternalSchemaUtils.searchSchema(currentInstantTime, historicalSchemas);
} else {
fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(currentInstantTime, hoodieTableMetaClient);
historicalSchemas.put(currentInstantTime, fileSchema);
}
InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema,
true, false).mergeSchema();
Schema mergedAvroSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, readerSchema.getFullName());

return Option.of((record) -> rewriteRecordWithNewSchema(record, mergedAvroSchema, Collections.emptyMap()));
return Option.of((record) -> rewriteRecordWithNewSchema(record, mergedAvroSchema, InternalSchemaUtils.collectRenameCols(fileSchema, internalSchema)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,8 @@ public class InternalSchemaCache {
* @return internalSchema
*/
public static InternalSchema searchSchemaAndCache(long versionID, HoodieTableMetaClient metaClient, boolean cacheEnable) {
Option<InternalSchema> candidateSchema = getSchemaByReadingCommitFile(versionID, metaClient);
if (candidateSchema.isPresent()) {
return candidateSchema.get();
}
if (!cacheEnable) {
// parse history schema and return directly
return InternalSchemaUtils.searchSchema(versionID, getHistoricalSchemas(metaClient));
getInternalSchemaByVersionId(versionID, metaClient);
}
String tablePath = metaClient.getBasePath();
// use segment lock to reduce competition.
Expand Down Expand Up @@ -117,22 +112,6 @@ private static TreeMap<Long, InternalSchema> getHistoricalSchemas(HoodieTableMet
return result;
}

private static Option<InternalSchema> getSchemaByReadingCommitFile(long versionID, HoodieTableMetaClient metaClient) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> instants = timeline.getInstantsAsStream().filter(f -> f.getTimestamp().equals(String.valueOf(versionID))).collect(Collectors.toList());
if (instants.isEmpty()) {
return Option.empty();
}
byte[] data = timeline.getInstantDetails(instants.get(0)).get();
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
return SerDeHelper.fromJson(latestInternalSchemaStr);
} catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e);
}
}

/**
* Get internalSchema and avroSchema for compaction/cluster operation.
*
Expand All @@ -142,24 +121,36 @@ private static Option<InternalSchema> getSchemaByReadingCommitFile(long versionI
*/
public static Pair<Option<String>, Option<String>> getInternalSchemaAndAvroSchemaForClusteringAndCompaction(HoodieTableMetaClient metaClient, String compactionAndClusteringInstant) {
// try to load internalSchema to support Schema Evolution
HoodieTimeline timelineBeforeCurrentCompaction = metaClient.getCommitsAndCompactionTimeline().findInstantsBefore(compactionAndClusteringInstant).filterCompletedInstants();
HoodieTimeline timelineBeforeCurrentCompaction = metaClient.getActiveTimeline()
.getCommitsTimeline().filterCompletedAndCompactionInstants().findInstantsBefore(compactionAndClusteringInstant);
Option<HoodieInstant> lastInstantBeforeCurrentCompaction = timelineBeforeCurrentCompaction.lastInstant();
if (lastInstantBeforeCurrentCompaction.isPresent()) {
// try to find internalSchema
byte[] data = timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get();
HoodieCommitMetadata metadata;
try {
byte[] data = timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get();
metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
} catch (Exception e) {
throw new HoodieException(String.format("cannot read metadata from commit: %s", lastInstantBeforeCurrentCompaction.get()), e);
}
String internalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
if (internalSchemaStr != null) {
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
return Pair.of(Option.of(internalSchemaStr), Option.of(existingSchemaStr));
String avroSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
if (!StringUtils.isNullOrEmpty(internalSchemaStr)) {
return Pair.of(Option.of(internalSchemaStr), Option.of(avroSchemaStr));
}
LOG.info(String.format("Schema evolution has not occurred before compaction commit: %s", compactionAndClusteringInstant));
Schema avroSchemaWithMeta = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(avroSchemaStr));
internalSchemaStr = SerDeHelper.toJson(AvroInternalSchemaConverter.convert(avroSchemaWithMeta));
return Pair.of(Option.of(internalSchemaStr), Option.of(avroSchemaWithMeta.toString()));
}
LOG.warn(String.format("cannot find any valid completed commits before current cluster/compaction instantTime: %s", compactionAndClusteringInstant));
// if a commits before compaction is cleaned, try to find internalSchema by search
Option<InternalSchema> oldInternalSchemaOpt = Option.of(InternalSchemaUtils.searchSchema(Long.parseLong(compactionAndClusteringInstant), getHistoricalSchemas(metaClient)));
if (oldInternalSchemaOpt.get().isEmptySchema()) {
throw new HoodieException(String.format("failed get internalSchema for compaction/clustering instant: %s", compactionAndClusteringInstant));
}
return Pair.of(Option.empty(), Option.empty());
return Pair.of(oldInternalSchemaOpt.map(SerDeHelper::toJson),
oldInternalSchemaOpt.map(f -> AvroInternalSchemaConverter.convert(f, metaClient.getTableConfig().getTableName()).toString()));
}

/**
Expand Down
Loading