Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
Expand All @@ -68,6 +69,13 @@
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.action.TableChange;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SchemaChangePersistHelper;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
Expand All @@ -78,6 +86,8 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;

import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -1409,4 +1419,138 @@ private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instant
metaClient.reloadActiveTimeline();
}
}

/**
* add columns to table.
*
* @param colName col name to be added. if we want to add col to a nested filed, the fullName should be specify
* @param schema col type to be added.
* @param doc col doc to be added.
* @param position col position to be added
* @param positionType col position change type. now support three change types: first/after/before
*/
public void addColumn(String colName, Schema schema, String doc, String position, TableChange.ColumnPositionChange.ColumnPositionType positionType) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = SchemaChangePersistHelper
.applyAddChange(pair.getLeft(), colName, AvroInternalSchemaConverter.convertToField(schema), doc, position, positionType);
commitTableChange(newSchema, pair.getRight());
}

public void addColumn(String colName, Schema schema) {
addColumn(colName, schema, null, "", TableChange.ColumnPositionChange.ColumnPositionType.NO_OPERATION);
}

/**
* delete columns to table.
*
* @param colNames col name to be deleted. if we want to delete col from a nested filed, the fullName should be specify
*/
public void deleteColumns(String... colNames) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = SchemaChangePersistHelper.applyDeleteChange(pair.getLeft(), colNames);
commitTableChange(newSchema, pair.getRight());
}

/**
* rename col name for hudi table.
*
* @param colName col name to be renamed. if we want to rename col from a nested filed, the fullName should be specify
* @param newName new name for current col. no need to specify fullName.
*/
public void renameColumn(String colName, String newName) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = SchemaChangePersistHelper.applyRenameChange(pair.getLeft(), colName, newName);
commitTableChange(newSchema, pair.getRight());
}

/**
* update col nullable attribute for hudi table.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param nullable .
*/
public void updateColumnNullability(String colName, boolean nullable) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = SchemaChangePersistHelper.applyColumnNullabilityChange(pair.getLeft(), colName, nullable);
commitTableChange(newSchema, pair.getRight());
}

/**
* update col Type for hudi table.
* only support update primitive type to primitive type.
* cannot update nest type to nest type or primitive type eg: RecordType -> MapType, MapType -> LongType.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param newType .
*/
public void updateColumnType(String colName, Type newType) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = SchemaChangePersistHelper.applyColumnTypeChange(pair.getLeft(), colName, newType);
commitTableChange(newSchema, pair.getRight());
}

/**
* update col comment for hudi table.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param doc .
*/
public void updateColumnComment(String colName, String doc) {
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = SchemaChangePersistHelper.applyColumnCommentChange(pair.getLeft(), colName, doc);
commitTableChange(newSchema, pair.getRight());
}

/**
* reorder the position of col.
*
* @param colName column which need to be reordered. if we want to change col from a nested filed, the fullName should be specify.
* @param referColName reference position.
* @param orderType col position change type. now support three change types: first/after/before
*/
public void reOrderColPosition(String colName, String referColName, TableChange.ColumnPositionChange.ColumnPositionType orderType) {
if (colName == null || orderType == null || referColName == null) {
return;
}
//get internalSchema
Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient();
InternalSchema newSchema = SchemaChangePersistHelper
.applyReOrderColPositionChange(pair.getLeft(), colName, referColName, orderType);
commitTableChange(newSchema, pair.getRight());
}

private Pair<InternalSchema, HoodieTableMetaClient> getInternalSchemaAndMetaClient() {
HoodieTableMetaClient metaClient = createMetaClient(true);
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Option<InternalSchema> internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata();
if (!internalSchemaOption.isPresent()) {
throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath()));
}
return Pair.of(internalSchemaOption.get(), metaClient);
}

private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient metaClient) {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
Schema schema = AvroInternalSchemaConverter.convert(newSchema, config.getTableName());
String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType());
String instantTime = HoodieActiveTimeline.createNewInstantTime();
startCommitWithTime(instantTime, commitActionType, metaClient);
config.setSchema(schema.toString());
HoodieActiveTimeline timeLine = metaClient.getActiveTimeline();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
metadata.setOperationType(WriteOperationType.ALTER_SCHEMA);
try {
timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
}
Map<String, String> extraMeta = new HashMap<>();
extraMeta.put(SerDeHelper.LATESTSCHEMA, SerDeHelper.toJson(newSchema.setSchemaId(Long.getLong(instantTime))));
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, Collections.EMPTY_LIST, Option.of(extraMeta), commitActionType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
Expand Down Expand Up @@ -550,6 +551,9 @@ public void archive(HoodieEngineContext context, List<HoodieInstant> instants) t
}
}
writeToFile(wrapperSchema, records);
// try to clean old history schema.
FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(metaClient);
fss.cleanOldFiles(instants.stream().map(is -> is.getTimestamp()).collect(Collectors.toList()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be in cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am ok to move this to cleaner.

} catch (Exception e) {
throw new HoodieCommitException("Failed to archive commits", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public enum WriteOperationType {
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// compact
COMPACT("compact"),
// alter schema
ALTER_SCHEMA("alter_schema"),
// used for old version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if ALTER SCHEMA belong to the write operation, and if the ddl operation should be put in BaseHoodieWriteClient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review.
I think it should belong to write opertion. DDL operations of sparksql will treat it as a write operation.
of course, if you think this is inappropriate, i will remove it from WriteOperation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We store Alter Schema commands as a separate commits like other write operations. Hence, the need for a separate WriteOperation enum

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

UNKNOWN("unknown");

Expand Down Expand Up @@ -86,6 +88,8 @@ public static WriteOperationType fromValue(String value) {
return CLUSTER;
case "compact":
return COMPACT;
case "alter_schema":
return ALTER_SCHEMA;
case "unknown":
return UNKNOWN;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class HoodieTableMetaClient implements Serializable {
public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR
+ ".fileids";

public static final String SCHEMA_FOLDER_NAME = ".schema";

public static final String MARKER_EXTN = ".marker";

private String basePath;
Expand Down Expand Up @@ -192,6 +194,13 @@ public String getColumnStatsIndexPath() {
return new Path(metaPath, COLUMN_STATISTICS_INDEX_NAME).toString();
}

/**
* @return schema folder path
*/
public String getSchemaFolderName() {
return new Path(metaPath, SCHEMA_FOLDER_NAME).toString();
}

/**
* @return Temp Folder path
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@

package org.apache.hudi.common.table;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -44,8 +37,18 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.io.storage.HoodieHFileReader;

import org.apache.hudi.io.storage.HoodieOrcReader;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
Expand Down Expand Up @@ -537,4 +540,51 @@ private boolean hasOperationField() {
return false;
}
}

/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return InternalSchema for this table
*/
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reading internal metadata from commit file, can we read it from the .hoodie/.schema folder (using FileBaseInternalSchemasManager).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel OK。 Just a little performance worried, as historySchema will gradually increase, read from commit file has better performance than read from FileBaseInternalSchemasManager.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save it as an indexed file (HFile) so you can just read the last record or first record and be done? Having one source of truth would be good. We can also do this as follow up

if (timeline.lastInstant().isPresent()) {
return getTableInternalSchemaFromCommitMetadata(timeline.lastInstant().get());
} else {
return Option.empty();
}
}

/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return InternalSchema for this table
*/
private Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
byte[] data = timeline.getInstantDetails(instant).get();
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATESTSCHEMA);
if (latestInternalSchemaStr != null) {
return SerDeHelper.fromJson(latestInternalSchemaStr);
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e);
}
}

/**
* Gets the history schemas as String for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return history schemas string for this table
*/
public Option<String> getTableHistorySchemaStrFromCommitMetadata() {
// now we only support FileBaseInternalSchemaManager
FileBasedInternalSchemaStorageManager manager = new FileBasedInternalSchemaStorageManager(metaClient);
String result = manager.getHistorySchemaStr();
return result.isEmpty() ? Option.empty() : Option.of(result);
}
}
Loading