Skip to content

Commit d9cc545

Browse files
committed
fix new comments
1 parent 9a2e4c1 commit d9cc545

File tree

17 files changed

+82
-51
lines changed

17 files changed

+82
-51
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ protected void commit(HoodieTable table, String commitActionType, String instant
264264
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema);
265265
if (evolvedSchema.equals(internalSchema)) {
266266
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
267+
//TODO save history schema by metaTable
267268
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
268269
} else {
269270
evolvedSchema.setSchemaId(Long.parseLong(instantTime));

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
155155
if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
156156
readerSchema = new Schema.Parser().parse(config.getSchema());
157157
internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
158+
// its safe to modify config here, since we running in task side.
159+
((HoodieTable) compactionHandler).getConfig().setDefault(config);
158160
} else {
159161
readerSchema = HoodieAvroUtils.addMetadataFields(
160162
new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
9797
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
9898
if (schemaPair.getLeft().isPresent()) {
9999
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, schemaPair.getLeft().get());
100+
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaPair.getRight().get());
100101
}
101102
compactionMetadata.setWriteStatuses(statuses);
102103
compactionMetadata.setCommitted(false);

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ private SparkInternalSchemaConverter() {
7979
public static final String HOODIE_QUERY_SCHEMA = "hoodie.schema.internal.querySchema";
8080
public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
8181
public static final String HOODIE_VALID_COMMITS_LIST = "hoodie.valid.commits.list";
82+
8283
/**
8384
* Converts a spark schema to an hudi internal schema. Fields without IDs are kept and assigned fallback IDs.
8485
*

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,7 @@ public boolean hasNext() {
935935

936936
@Override
937937
public GenericRecord next() {
938-
return rewriteRecord(oldRecords.next(), newSchema);
938+
return rewriteRecordWithNewSchema(oldRecords.next(), newSchema);
939939
}
940940
};
941941
}

hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
559559
*/
560560
private Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) {
561561
try {
562-
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
562+
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedInstants();
563563
byte[] data = timeline.getInstantDetails(instant).get();
564564
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
565565
String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);

hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public abstract class AbstractHoodieLogRecordReader {
116116
// Total log files read - for metrics
117117
private AtomicLong totalLogFiles = new AtomicLong(0);
118118
// Internal schema
119-
private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema();
119+
private InternalSchema internalSchema;
120120
private final String path;
121121
// Total log blocks read - for metrics
122122
private AtomicLong totalLogBlocks = new AtomicLong(0);

hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
2828
import java.util.Comparator;
29+
import java.util.HashMap;
2930
import java.util.List;
3031
import java.util.Locale;
3132
import java.util.Map;
@@ -68,10 +69,9 @@ public InternalSchema(Field... columns) {
6869
public InternalSchema(long versionId, List<Field> cols) {
6970
this.versionId = versionId;
7071
this.record = RecordType.get(cols);
71-
if (versionId >= 0) {
72-
buildIdToName();
73-
maxColumnId = idToName.keySet().stream().max(Comparator.comparing(Integer::valueOf)).get();
74-
}
72+
idToName = cols.isEmpty() ? new HashMap<>() : InternalSchemaBuilder.getBuilder().buildIdToName(record);
73+
nameToId = cols.isEmpty() ? new HashMap<>() : idToName.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
74+
maxColumnId = idToName.isEmpty() ? -1 : idToName.keySet().stream().max(Comparator.comparing(Integer::valueOf)).get();
7575
}
7676

7777
public InternalSchema(long versionId, int maxColumnId, List<Field> cols) {
@@ -90,20 +90,10 @@ public RecordType getRecord() {
9090
}
9191

9292
private Map<Integer, String> buildIdToName() {
93-
if (idToName == null) {
94-
idToName = InternalSchemaBuilder.getBuilder().buildIdToName(record);
95-
}
9693
return idToName;
9794
}
9895

9996
private Map<String, Integer> buildNameToId() {
100-
if (nameToId == null) {
101-
if (idToName != null && !idToName.isEmpty()) {
102-
nameToId = idToName.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
103-
return nameToId;
104-
}
105-
nameToId = InternalSchemaBuilder.getBuilder().buildNameToId(record);
106-
}
10797
return nameToId;
10898
}
10999

hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ public class InternalSchemaMerger {
3939
// if mergeRequiredFiledForce is true, we will ignore the col's required attribute.
4040
private final boolean ignoreRequiredAttribute;
4141
// Whether to use column Type from file schema to read files when we find some column type has changed.
42+
// spark parquetReader need the original column type to read data, otherwise the parquetReader will failed.
43+
// eg: current column type is StringType, now we changed it to decimalType,
44+
// we should not pass decimalType to parquetReader, we must pass StringType to it; when we read out the data, we convert data from String to Decimal, everything is ok.
45+
// for log reader
46+
// since our reWriteRecordWithNewSchema function support rewrite directly, so we no need this parameter
47+
// eg: current column type is StringType, now we changed it to decimalType,
48+
// we can pass decimalType to reWriteRecordWithNewSchema directly, everything is ok.
4249
private boolean useColumnTypeFromFileSchema = true;
4350

4451
public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) {

hudi-common/src/main/java/org/apache/hudi/internal/schema/io/AbstractInternalSchemaStorageManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ abstract class AbstractInternalSchemaStorageManager {
3737
/**
3838
* Get latest history schema string.
3939
* Using give validCommits to validate all legal histroy Schema files, and return the latest one.
40+
* If the passed valid commits is null or empty, valid instants will be fetched from the file-system and used.
4041
*/
4142
public abstract String getHistorySchemaStrByGivenValidCommits(List<String> validCommits);
4243

0 commit comments

Comments
 (0)