Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ protected HoodieTimeline getActiveTimeline() {
private Object[] parsePartitionColumnValues(String[] partitionColumns, String partitionPath) {
Object[] partitionColumnValues = doParsePartitionColumnValues(partitionColumns, partitionPath);
if (shouldListLazily && partitionColumnValues.length != partitionColumns.length) {
throw new HoodieException("Failed to parse partition column values from the partition-path:"
+ " likely non-encoded slashes being used in partition column's values. You can try to"
+ " work this around by switching listing mode to eager");
LOG.warn(">>> PartitionColumns: " + partitionColumns + " PartitionValues: " + partitionColumnValues);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I assume we still need to fail here instead of printing the warning and letting it return?

}

return partitionColumnValues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public HiveHoodieTableFileIndex(HoodieEngineContext engineContext,
shouldIncludePendingCommits,
true,
new NoopCache(),
false);
true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I remember we need to fix the lazy listing for Hvie File Index. Should this be in a separate PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should not have been part of this PR. Actually, it doesn't really matter for Hudi connector as it doesn't go through COW input format code. And for Hive connector, we already saw that it was partition loader will instantiate this every call. However, the actual perf issue for Hive connector was fixed due to #7527 (comment)

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -82,32 +81,25 @@ public class SchemaEvolutionContext {

private final InputSplit split;
private final JobConf job;
private HoodieTableMetaClient metaClient;
private final HoodieTableMetaClient metaClient;
public Option<InternalSchema> internalSchemaOption;

public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException {
public SchemaEvolutionContext(InputSplit split, JobConf job) {
this(split, job, Option.empty());
}

public SchemaEvolutionContext(InputSplit split, JobConf job, Option<HoodieTableMetaClient> metaClientOption) throws IOException {
public SchemaEvolutionContext(InputSplit split, JobConf job, Option<HoodieTableMetaClient> metaClientOption) {
this.split = split;
this.job = job;
this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now the init of internalSchemaOption has been removed.

if (schemaEvolutionContext.internalSchemaOption.isPresent()) {
need to modify

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok then there is no need of this PR.

if (this.metaClient == null) {
internalSchemaOption = Option.empty();
return;
}
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata();
} catch (Exception e) {
internalSchemaOption = Option.empty();
LOG.warn(String.format("failed to get internal Schema from hudi table:%s", metaClient.getBasePathV2()), e);
}
LOG.info(String.format("finish init schema evolution for split: %s", split));
}

private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException {
private HoodieTableMetaClient setUpHoodieTableMetaClient() {
try {
Path inputPath = ((FileSplit)split).getPath();
FileSystem fs = inputPath.getFileSystem(job);
Expand Down Expand Up @@ -159,27 +151,26 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
* Do schema evolution for ParquetFormat.
*/
public void doEvolutionForParquetFormat() {
if (internalSchemaOption.isPresent()) {
List<String> requiredColumns = getRequireColumn(job);
// No need trigger schema evolution for count(*)/count(1) operation
boolean disableSchemaEvolution = requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, do requiredColumns contain the columns from the predicate(s), e.g., count(*) where col1 is not null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This is existing logic. Still wondering the same question.

if (!disableSchemaEvolution) {
if (!internalSchemaOption.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this condition be internalSchemaOption == null since it may not be initialized?

internalSchemaOption = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still do try .. catch .. here in case the internal schema cannot be read?

}
// reading hoodie schema evolution table
job.setBoolean(HIVE_EVOLUTION_ENABLE, true);
Path finalPath = ((FileSplit)split).getPath();
Path finalPath = ((FileSplit) split).getPath();
InternalSchema prunedSchema;
List<String> requiredColumns = getRequireColumn(job);
// No need trigger schema evolution for count(*)/count(1) operation
boolean disableSchemaEvolution =
requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty());
if (!disableSchemaEvolution) {
prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns);
InternalSchema querySchema = prunedSchema;
Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true,
true).mergeSchema();
List<Types.Field> fields = mergedInternalSchema.columns();
setColumnNameList(job, fields);
setColumnTypeList(job, fields);
pushDownFilter(job, querySchema, fileSchema);
}
prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns);
InternalSchema querySchema = prunedSchema;
Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema();
List<Types.Field> fields = mergedInternalSchema.columns();
setColumnNameList(job, fields);
setColumnTypeList(job, fields);
pushDownFilter(job, querySchema, fileSchema);
Comment on lines 161 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and should this part be guarded by !internalSchemaOption.isPresent()?

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ public abstract class AbstractRealtimeRecordReader {
private Schema readerSchema;
private Schema writerSchema;
private Schema hiveSchema;
private HoodieTableMetaClient metaClient;
private final HoodieTableMetaClient metaClient;
protected SchemaEvolutionContext schemaEvolutionContext;
// support merge operation
protected boolean supportPayload = true;
protected boolean supportPayload;
// handle hive type to avro record
protected HiveAvroSerializer serializer;

Expand Down