Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ced7477
[HUDI-2560] introduce id_based schema to support full schema evolution.
xiarixiaoyao Oct 15, 2021
13fad93
add test for FileBasedInternalSchemaStorageManger and rebase code
xiarixiaoyao Nov 22, 2021
3df69e7
add support for change column type and fix some test case
xiarixiaoyao Jan 26, 2022
64b5492
fix some bugs encountered in the production env and delete useless code
xiarixiaoyao Jan 26, 2022
0b468d1
fix test error
xiarixiaoyao Jan 27, 2022
c34fdf8
rebase code
xiarixiaoyao Feb 14, 2022
31a79d5
fixed some nested schema change bugs
xiarixiaoyao Feb 23, 2022
926c0be
[HUDI-2429][Stacked On HUDI-2560]Support full schema evolution for spark
xiarixiaoyao Feb 25, 2022
b1aa45e
[use dummyInternalSchema instead of null]
xiarixiaoyao Feb 26, 2022
0edac35
add support for spark3.1.x
xiarixiaoyao Feb 26, 2022
f7dde32
remove support for spark3.1.x , sicne some compile fail
xiarixiaoyao Feb 26, 2022
d808254
support spark3.1.x
xiarixiaoyao Feb 26, 2022
057e8f0
rebase and prepare solve all comments
xiarixiaoyao Mar 19, 2022
58cd3a6
address all comments
xiarixiaoyao Mar 21, 2022
8951618
rebase code
xiarixiaoyao Mar 22, 2022
7a0dc9e
fixed the count(*) bug
xiarixiaoyao Mar 22, 2022
eee91e5
try to get internalSchema by parser commit file/history file directly…
xiarixiaoyao Mar 23, 2022
3cde85a
fixed all comments
xiarixiaoyao Mar 23, 2022
d8fa679
fix new comments
xiarixiaoyao Mar 25, 2022
6828189
rebase code,fix UT failed
xiarixiaoyao Mar 26, 2022
fc707d4
fixed mistake
xiarixiaoyao Mar 29, 2022
011e886
rebase code ,fixed new comments
xiarixiaoyao Mar 31, 2022
dc3b29a
rebase code , and prepare for address new comments
xiarixiaoyao Apr 1, 2022
3c9bfc9
address commits
xiarixiaoyao Apr 1, 2022
71fcd14
address new comments
xiarixiaoyao Apr 1, 2022
c7c98b0
fix new issues
xiarixiaoyao Apr 1, 2022
2415e5c
control fallback original write logical
xiarixiaoyao Apr 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
Expand All @@ -71,6 +73,15 @@
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
import org.apache.hudi.internal.schema.action.TableChange;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.HoodieMetrics;
Expand All @@ -85,6 +96,7 @@

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -101,6 +113,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;

/**
* Abstract Write Client providing functionality for performing commit, index updates and rollback
* Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap
Expand Down Expand Up @@ -246,12 +260,42 @@ protected void commit(HoodieTable table, String commitActionType, String instant
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
// do save internal schema to support Implicitly add columns in write process
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull into a Separate method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
&& metadata.getExtraMetadata().containsKey(SCHEMA_KEY) && table.getConfig().getSchemaEvolutionEnable()) {
saveInternalSchema(table, instantTime, metadata);
}
// update Metadata table
writeTableMetadata(table, instantTime, commitActionType, metadata);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}

// Save internal schema
private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCommitMetadata metadata) {
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty()) {
InternalSchema internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema);
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
} else {
evolvedSchema.setSchemaId(Long.parseLong(instantTime));
String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, newSchemaStr);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
}
// update SCHEMA_KEY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what concern we start a separate timeline for schemas, is there possibility we reuse the existing meta files for the internal schema ? And do we have plan to replace the avro schema with internal schema in the future ? The Avro schema can not handle data types like small int and tiny int.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answer
1)I think DDL should be an independent operation and should not intersect with the original commit
2)yes,we plan to do that, but before we start that we need flink to support full schema evolution, Otherwise, the gap of the Flink module and other modules will become larger and larger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what is the relationship with the DDL schema change and the schema change on write ? For schema change on write, we already reuse the schema in the instant metadata file, we should elaborate more to have uniform abstraction for these two cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi danney Do you want to ask line 292 why we use a new timeline to save history schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this patch, we have avro schema for metadata file and an separate internal schema for DDL operations, and the avro schema can also handle the shema change on write, these abstraction is not that clear and we need to elaborate more with the behaviors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we probably need a blog or proper documentation describing the changes on a high level. WDYT @xiarixiaoyao ?

metadata.addMetadata(SCHEMA_KEY, AvroInternalSchemaConverter.convert(evolvedSchema, avroSchema.getName()).toString());
}
}

protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}
Expand Down Expand Up @@ -1442,8 +1486,8 @@ protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
if (commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY)) {
config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY));
if (commitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {
config.setSchema(commitMetadata.getExtraMetadata().get(SCHEMA_KEY));
} else {
throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
}
Expand Down Expand Up @@ -1505,4 +1549,138 @@ private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instant
metaClient.reloadActiveTimeline();
}
}

/**
* add columns to table.
*
* @param colName col name to be added. if we want to add col to a nested filed, the fullName should be specify
* @param schema col type to be added.
* @param doc col doc to be added.
* @param position col position to be added
* @param positionType col position change type. now support three change types: first/after/before
*/
public void addColumn(String colName, Schema schema, String doc, String position, TableChange.ColumnPositionChange.ColumnPositionType positionType) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft())
.applyAddChange(colName, AvroInternalSchemaConverter.convertToField(schema), doc, position, positionType);
commitTableChange(newSchema, pair.getRight());
}

public void addColumn(String colName, Schema schema) {
addColumn(colName, schema, null, "", TableChange.ColumnPositionChange.ColumnPositionType.NO_OPERATION);
}

/**
* delete columns to table.
*
* @param colNames col name to be deleted. if we want to delete col from a nested filed, the fullName should be specify
*/
public void deleteColumns(String... colNames) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyDeleteChange(colNames);
commitTableChange(newSchema, pair.getRight());
}

/**
* rename col name for hudi table.
*
* @param colName col name to be renamed. if we want to rename col from a nested filed, the fullName should be specify
* @param newName new name for current col. no need to specify fullName.
*/
public void renameColumn(String colName, String newName) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyRenameChange(colName, newName);
commitTableChange(newSchema, pair.getRight());
}

/**
* update col nullable attribute for hudi table.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param nullable .
*/
public void updateColumnNullability(String colName, boolean nullable) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyColumnNullabilityChange(colName, nullable);
commitTableChange(newSchema, pair.getRight());
}

/**
* update col Type for hudi table.
* only support update primitive type to primitive type.
* cannot update nest type to nest type or primitive type eg: RecordType -> MapType, MapType -> LongType.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param newType .
*/
public void updateColumnType(String colName, Type newType) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyColumnTypeChange(colName, newType);
commitTableChange(newSchema, pair.getRight());
}

/**
* update col comment for hudi table.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param doc .
*/
public void updateColumnComment(String colName, String doc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are all these functions getting used? I do not see any caller for these @xiarixiaoyao

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the exposed API interfaces.
i recommend to do DDL operation by SparkSQL #5238

Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyColumnCommentChange(colName, doc);
commitTableChange(newSchema, pair.getRight());
}

/**
* reorder the position of col.
*
* @param colName column which need to be reordered. if we want to change col from a nested filed, the fullName should be specify.
* @param referColName reference position.
* @param orderType col position change type. now support three change types: first/after/before
*/
public void reOrderColPosition(String colName, String referColName, TableChange.ColumnPositionChange.ColumnPositionType orderType) {
if (colName == null || orderType == null || referColName == null) {
return;
}
//get internalSchema
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft())
.applyReOrderColPositionChange(colName, referColName, orderType);
commitTableChange(newSchema, pair.getRight());
}

private Pair<InternalSchema, HoodieTableMetaClient> getInternalSchemaAndMetaClient() {
HoodieTableMetaClient metaClient = createMetaClient(true);
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Option<InternalSchema> internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata();
if (!internalSchemaOption.isPresent()) {
throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath()));
}
return Pair.of(internalSchemaOption.get(), metaClient);
}

private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient metaClient) {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
Schema schema = AvroInternalSchemaConverter.convert(newSchema, config.getTableName());
String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType());
String instantTime = HoodieActiveTimeline.createNewInstantTime();
startCommitWithTime(instantTime, commitActionType, metaClient);
config.setSchema(schema.toString());
HoodieActiveTimeline timeLine = metaClient.getActiveTimeline();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
metadata.setOperationType(WriteOperationType.ALTER_SCHEMA);
try {
timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
}
Map<String, String> extraMeta = new HashMap<>();
extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(newSchema.setSchemaId(Long.getLong(instantTime))));
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of storing history schema here, I guess this is redundant since we are anyways storing the evolved schema as history schema in saveInternalSchema() method which gets called from commitStats(). WDYT @xiarixiaoyao ?
Also can you share your slack id with me, it will be easier to coordinate with you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course, already ping you in slack

commitStats(instantTime, Collections.EMPTY_LIST, Option.of(extraMeta), commitActionType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "implementations of HoodieRecordPayload to convert incoming records to avro. This is also used as the write schema "
+ "evolving records during an update.");

public static final ConfigProperty<String> INTERNAL_SCHEMA_STRING = ConfigProperty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will revisit these docs.

.key("hoodie.internal.schema")
.noDefaultValue()
.withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to "
+ "implementations of evolution of schema");

public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This almost seems it like Hudi does not evolve schemas today. Could we call this schema.on.read.enable (what we have today is a more conservative schema.on.write approach. what are your thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problems, let me modfy it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao : It is better to prefix with hoodie in the config. So, hoodie.schema.on.read.enable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.key("hoodie.schema.on.read.enable")
.defaultValue(false)
.withDocumentation("enable full schema evolution for hoodie");

public static final ConfigProperty<Boolean> ENABLE_INTERNAL_SCHEMA_CACHE = ConfigProperty
.key("hoodie.schema.cache.enable")
.defaultValue(false)
.withDocumentation("cache query internalSchemas in driver/executor side");

public static final ConfigProperty<String> AVRO_SCHEMA_VALIDATE_ENABLE = ConfigProperty
.key("hoodie.avro.schema.validate")
.defaultValue("false")
Expand Down Expand Up @@ -886,6 +902,30 @@ public void setSchema(String schemaStr) {
setValue(AVRO_SCHEMA_STRING, schemaStr);
}

public String getInternalSchema() {
return getString(INTERNAL_SCHEMA_STRING);
}

public boolean getInternalSchemaCacheEnable() {
return getBoolean(ENABLE_INTERNAL_SCHEMA_CACHE);
}

public void setInternalSchemaString(String internalSchemaString) {
setValue(INTERNAL_SCHEMA_STRING, internalSchemaString);
}

public void setInternalSchemaCacheEnable(boolean enable) {
setValue(ENABLE_INTERNAL_SCHEMA_CACHE, String.valueOf(enable));
}

public boolean getSchemaEvolutionEnable() {
return getBoolean(SCHEMA_EVOLUTION_ENABLE);
}

public void setSchemaEvolutionEnable(boolean enable) {
setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
}

/**
* Get the write schema for written records.
*
Expand Down Expand Up @@ -2075,6 +2115,16 @@ public Builder withSchema(String schemaStr) {
return this;
}

public Builder withSchemaEvolutionEnable(boolean enable) {
writeConfig.setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
return this;
}

public Builder withInternalSchemaCacheEnable(boolean enable) {
writeConfig.setValue(ENABLE_INTERNAL_SCHEMA_CACHE, String.valueOf(enable));
return this;
}

public Builder withAvroSchemaValidate(boolean enable) {
writeConfig.setValue(AVRO_SCHEMA_VALIDATE_ENABLE, String.valueOf(enable));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;

Expand Down Expand Up @@ -240,6 +241,14 @@ public HoodieCleanMetadata execute() {
List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
if (pendingCleanInstants.size() > 0) {
// try to clean old history schema.
try {
FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
fss.cleanOldFiles(pendingCleanInstants.stream().map(is -> is.getTimestamp()).collect(Collectors.toList()));
} catch (Exception e) {
// we should not affect original clean logic. Swallow exception and log warn.
LOG.warn("failed to clean old history schema");
}
pendingCleanInstants.forEach(hoodieInstant -> {
if (table.getCleanTimeline().isEmpty(hoodieInstant)) {
table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
Expand Down
Loading