Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,6 @@ protected Configuration getHadoopConf() {

protected String getLatestDataInstantTime() {
return dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
.map(HoodieInstant::getTimestamp).orElse(HoodieTableMetadataUtil.createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this solves/gives us much.
1: if we happened to initialize more than 1 MDT partition, the initialization time will be different. its 010 suffix for 1st and 011 for 2nd.
2: this api is used only in logging.

So, may not be worth fixing it. atleast for this (getLatestDataInstantTime).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm scared by the hardcode -0, it is hard to maintain, at least we should fine a constant for it.

Copy link
Member Author

@codope codope Jun 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm gonna remove this method.. we don't really need it and also change the log level to debug.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -471,12 +471,9 @@ public Pair<HoodieMetadataLogRecordReader, Long> getLogRecordScanner(List<Hoodie

// Only those log files which have a corresponding completed instant on the dataset should be read
// This is because the metadata table is updated before the dataset instants are committed.
Set<String> validInstantTimestamps = HoodieTableMetadataUtil
.getValidInstantTimestamps(dataMetaClient, metadataMetaClient);

Set<String> validInstantTimestamps = HoodieTableMetadataUtil.getValidInstantTimestamps(dataMetaClient, metadataMetaClient);
Option<HoodieInstant> latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);

String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(HoodieTableMetadataUtil.createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 0));
Copy link
Contributor

@nsivabalan nsivabalan Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see much benefit here too.
but not too strong.

this code will be invoked only after any partition in MDT will be initialized(which means the table config is updated). which means, the latestMetadataInstant should already be valid (Option will be non empty). So, what are the chances that we will call getRecordsByKey with BaseTableMetadata when any of MDT partitions have been initialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Let's actually do orElseThrow(() -> new IllegalStateException("No completed instant in the metadata timeline.")) instead of setting some initial timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, we should not be in that state. So, throwing an exception makes sense.
Also, this is under a public API, the construction of this class is also not safe. Since, we are allowing the object to be created even if the metadata table is not there. So, should we also throw exception in initIfNeeded method?

boolean allowFullScan = allowFullScanOverride.orElseGet(() -> isFullScanAllowedForPartition(partitionName));

// Load the schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.avro.AvroTypeException;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.ConvertingGenericData;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
Expand Down Expand Up @@ -63,10 +56,19 @@
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.util.Lazy;

import org.apache.avro.AvroTypeException;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -1361,7 +1363,13 @@ public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient dataMe
});

// SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid timestamp
validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, PARTITION_INITIALIZATION_TIME_SUFFIX));
List<String> metadataInitializationTimestamps = metadataMetaClient.getActiveTimeline()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also considering if this will give us any benefit.
validInstantTimestamps are used within LogRecordReader to ignore log blocks which was written using commits which are not completed yet.

Lets consider diff cases:

  1. for an existing table, we may never use SOLO commit time only since there will def be a latest completed commit from data table that we will use.
  2. For a new table. the base commit time to initialize MDT will be chosen as SOLO COMMIT TIME + suffix (one for each partition being initialized). and bulk insert will kick in. So, the base instant time will have prefix of SOLO COMMIT TIME . but any new log files will be added using new delta commits which will have diff commit times. so, I don't see a necessity to add SOLO COMMIT TIME to list of valid instant times only.

let me know if I am missing any flow. Just tryin to avoid going through entire active timeline of MDT to filter for SOLO COMMIT TIME if its never going to be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words,
if valid instant times are used only to filter log blocks, and if there won't be any flow where we might write log blocks with instant times w/ SOLO COMMIT TIME as prefix, we don't need to make this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but any new log files will be added using new delta commits which will have diff commit times.

I agree, the SOLO_COMMIT_TIMESTAMP should be kept for backward compatibility BTW.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 @nsivabalan Agree with your points but should a util method be aware of different cases? Let's say tomorrow for a new MDT partition, the semantics change and it writes a data block with the initializing commit itself, then the author/reviewer needs to come back to util method and fix it. This is going to be harder to maintain. IMO, the better way to handle such cases is by keeping the util method dumb, and do any case handling at the call site or have assertions for invariants such as data block can never have initializing commit time. Wdyt?

Btw, the change is backward compatible as it checks for startsWith(SOLO_COMMIT_TIMESTAMP). Also, there is no reloading of the timeline but to avoid going through active timeline again, i can merge the filter with the stream operation at lines 1349-1350.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid instants are mainly needed for reading the log files.
Whereas, first bootstrap commit on the partition, will create one set of dummy delete log blocks and a base file. So, all the data written into partition on the boostrap commit will be present in the base file. Even if dont include these SOLO_COMMIT_TIMESTAMP we should be ok I guess.
But it will be good to include them to keep it consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if dont include these SOLO_COMMIT_TIMESTAMP we should be ok I guess.

Yes, that's why I'm saying SOLO_COMMIT_TIMESTAMP is for compatibility.

.filterCompletedInstants()
.getInstantsAsStream()
.map(HoodieInstant::getTimestamp)
.filter(timestamp -> timestamp.startsWith(SOLO_COMMIT_TIMESTAMP))
.collect(Collectors.toList());
validInstantTimestamps.addAll(metadataInitializationTimestamps);
return validInstantTimestamps;
}

Expand Down