Skip to content

Conversation

@szehon-ho
Copy link
Member

@szehon-ho szehon-ho commented May 18, 2021

Fixes: #2435

Took Russell's suggestion, actually its a lot easier to add an option to RewriteManifestsAction (which already is well-distributed). Propose: RepairMode, if == REPAIR_ENTRIES, then goes to FileSystem to read metadata about the file to update the manifest entry before rewrite.

List added to RewriteManifestsAction results, these include the manifest file in which the entries have been repaired, and a list of field-names that have been repaired to summarize.

Next steps:

-repair split offsets (did not implement , as just wanted to get a first version)
-Mode: RepairMode.REMOVE_DELETED_FILES, RepairMode.ADD_NEW_DATA_FILES

/**
* Represents a repaired DataFile
*/
public static class RepairedDataFile {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be public for Spark serialization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this comment into class's comment as well, "Needs to be public for Spark serialization"?

@szehon-ho szehon-ho changed the title Repair manifests Core : Repair manifests May 18, 2021
@szehon-ho
Copy link
Member Author

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall.
Beside, we are trying to implement a snapshot integrity checker, which also needs the data file statics checking as an option. In that case, we may NOT need to repair manifests, but to report any mismatch. Getting a human readable diff report is more important in that case. It'd be nice if diff report can be more human readable or at least more extendable, we can add more format afterwards.

private PartitionSpec spec = null;
private Predicate<ManifestFile> predicate = manifest -> true;
private String stagingLocation = null;
private RepairMode mode = RepairMode.NONE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"mode" -> "repairMode", make it more descriptive?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a list of mode here? IIUC, we are trying to add more modes, and users can select one or many modes.

* @param conf Hadoop configuration
* @param metricsSpec metrics configuration
* @param mapping name mapping
* @return metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The param comments order isn't correct. Do we even need them? The names have told the story.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure our checkstyle will be mad if they are missing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the comment

static Set<String> diff(DataFile first, DataFile second) {
Set<String> result = new HashSet<>();
if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
result.add("file_size_in_bytes");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"file_size_in_bytes" -> FILE_SIZE.name() ?
This applies to the following strings as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Optional<RepairManifestHelper.RepairedDataFile> repaired =
RepairManifestHelper.repairDataFile(dataFile, broadcastTable.value(), spec, conf.value().value());
if (repaired.isPresent()) {
repairedColumns.addAll(repaired.get().repairedFields());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it readable if we put fields of multiple files in one set? Are we going to distinguish diffs for each data files?

MetricsConfig.fromProperties(table.properties()), nameMapping));

DataFile newFile = newDfBuilder.build();
Set<String> diff = RepairManifestHelper.diff(file, newFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit: RepairManifestHelper.diff -> diff

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param mode repair mode
* @return this for method chaining
*/
public RewriteManifestsAction repair(org.apache.iceberg.actions.RewriteManifests.RepairMode mode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to update this version since it's deprecated

import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

@Deprecated
public class RewriteManifestsActionResult {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also deprecated

return this;
}

public SparkDataFile withSpecId(int newSpecId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you file the proposal to add specID into the datafile? I know i've had a few temporary prs where I thought about doing this. We could change the reader itself so that this is always populated and I think there is an active PR to do this somewhere as well.

manifestEntryDF, targetNumManifests, targetNumManifestEntries);
}

List<ManifestFile> newManifests = repairedManifests.stream().map(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

  .stream()
  .map(RepairManifestHelper.RepairedManifestFile::manifestFile)
  .collect

@szehon-ho
Copy link
Member Author

@flyrain thanks for the review, so to understand, you would prefer a result of Map of individual manifest-entry changes instead of a summary of manifest-files changed? I was thinking that but was fearing it would be too big of a result.

But if we want this way (and extend this to a diff tool), we could change to return a List of something like:

RepairedManifestEntry(ManifestFile parentFile, DataFile entry, List repairedFields)

Is that in line with your thoughts?


/**
* Diffs two DataFile for potential for repair
* @return a set of fields in human-readable format that differ between these DataFiles
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer, checkstyle didn't report any issue here but param comments are missing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although for one liners I believe we do
Return .....

Without the tag, this is so the summary field gets populated with the full description

@flyrain
Copy link
Contributor

flyrain commented May 19, 2021

@flyrain thanks for the review, so to understand, you would prefer a result of Map of individual manifest-entry changes instead of a summary of manifest-files changed? I was thinking that but was fearing it would be too big of a result.

Yes. Map works here. Your concern is valid. The size varies dramatically. For a table with 1TB data, if the average file size is 256M, we got 1000000/256 = 4,000 data files, we probably needs 100 bytes for each data files, which is about 400M data, that sounds too much to me as well.

In that sense, I'm OK with the current implementation, we can think about the different way to handle the future requirement.

public BaseRewriteManifestsActionResult(Iterable<ManifestFile> rewrittenManifests,
Iterable<ManifestFile> addedManifests) {
Iterable<ManifestFile> addedManifests,
Iterable<BaseRepairedManifestFile> repairedManifests) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably didn't intend to use the implementation class in this API, right?

return (Iterable<RepairedManifest>) repairedManifests;
}

public static class BaseRepairedManifestFile implements RewriteManifests.Result.RepairedManifest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation class shouldn't be public.

* @param mode repair mode
* @return this for method chaining
*/
RewriteManifests repair(RepairMode mode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally prefer to use configuration methods rather than enums in public APIs. That would also allow us to avoid the NONE option and have the actual repair be a bit more specific, like repairFileLengths(). What do you guys think, @szehon-ho, @RussellSpitzer?

public BaseRewriteManifestsSparkAction(SparkSession spark, Table table) {
super(spark);
this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
this.manifestEncoder = Encoders.javaSerialization(RepairManifestHelper.RepairedManifestFile.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes actually needed? The RepairedManifestFile interface sends back the fields that were repaired. Is that specific to a file and empty if, for example, the length for all manifest entries were already correct?

I don't see much benefit to doing it that way. If we were to have more specific repair operations, then we don't need that interface at all because we'd already know what manifests fields are being fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK yes I was trying to think something useful to return to user. But maybe its not very useful as its not specific as per discussion with @flyrain earlier in the review, and returning all patched manifest-entries is a bit overkill. I'm ok for just return the list of repaired ManifestFile

SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType).withSpecId(spec.specId());

ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
Set<String> repairedColumns = new HashSet<String>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: we use factory methods, like Sets.newHashSet().

SparkDataFile dataFile = wrapper.wrap(file);
if (mode == RepairMode.REPAIR_ENTRIES) {
Optional<RepairManifestHelper.RepairedDataFile> repaired =
RepairManifestHelper.repairDataFile(dataFile, broadcastTable.value(), spec, conf.value().value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation is off.

* @param mapping name mapping
* @return metrics
*/
private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is rare for an Iceberg method to use get because it almost never aids understanding of what the method does. Unless the method is a getter method on a java bean and the class needs to conform to that convention, we should find a more descriptive verb.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

@rdblue
Copy link
Contributor

rdblue commented May 19, 2021

@szehon-ho, @RussellSpitzer, I'm debating whether I agree with the choice to add repair operations to the rewrite manifests action. I think it's very different and significantly more expensive to touch each data file. I think it would make sense for repairs to be done by a repair action. We can share a lot of the implementation, but I think it makes more sense to have an action dedicated to this.

@szehon-ho
Copy link
Member Author

szehon-ho commented May 20, 2021

Great to see you back @rdblue :)

Yes, after this first implementation, I see some advantages of having dedicated RepairManifestAction. RewriteManifestAction is compaction-oriented, and in so by design it cannot run across two separate partitionSpecs, whereas RepairManifests should be able to do so as it would not combine manifest files.

And yes in general, I see the two can be conceptually different like you said. I can spend some time to look at making this separate action, and refactor common code to the base class.

@szehon-ho szehon-ho force-pushed the repair_manifests_apache branch from a85fc18 to 45d0a8e Compare June 21, 2021 19:38
@szehon-ho
Copy link
Member Author

szehon-ho commented Jun 21, 2021

As per the discussion that it deserves to be its own action rather than part of RewriteManifests, completely rewrote RepairManifests to be a separate spark action (BaseRepairManifestsSparkAction), and removed the base logic between it and BaseRewriteManifestAction to base class: BaseManifestSparkAction.

Summary, it distributes the repair, first grouping all entries by ManifestFile, calculating what needs to be repaired for each entry by reading various aspects of the dataFile pointed to by the entry, and writing all the entries back out if any needed repair (the manifest file still retains same number of entries).

Not all logic can be shared. In Repair path, the specId is queried from the original manifest-file , and kept around to write the repaired manifest file (vs passed in).

There is also a problem I noticed, the returned ManifestFiles of RewriteManifests action is wrong if "snapshotIdInheritanceEnabled" is false (as this path rewrites the manifest-file to a final location). So fixed the method while extracting it from BaseRewriteManifestsSparkAction to the new base. (A subsequent change can fix this issue and add a test in RewriteManifests).

@rdblue @aokolnychyi @flyrain @RussellSpitzer if you guys have time for another look


@Override
public Result execute() {
String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repairing manifests ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Override
public Result execute() {
String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REPAIR-MANIFESTS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.filter("status < 2") // select only live entries
.selectExpr("input_file_name() as manifest_path", "snapshot_id", "sequence_number", "data_file");

return manifestEntryDF.as("manifest_entry")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should fix this in the actual ManfiestEntry Row and have the spec generated at read time, it feels a little odd to me to be joining to reconnect it when we knew the right ID when we read it the first time. I'm fine with this for now but we really need to get SpecID into that metadata table.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I will file a ticket for that if none exist. What is the policy for adding to metadata table schemas, ie do you know if we have some backward-compatibility policy?


String manifestName = "repaired-m-" + UUID.randomUUID();
Path newManifestPath = new Path(location, manifestName);
OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(newManifestPath.toString()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to parallelize this as well? Using Tasks.foreach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean , parallelize this call: writer.existing(e.dataFile, e.snapshotId, e.sequenceNumber)?

It does not seem very multi-thread safe, looking through the writers in the writer stack (ManifestWriter, AvroFileAppender, DataFileWriter, GenericAvroWriter, the Avro writers..) have a lot of variables like counts that aren't safe.

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 17, 2024
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Jul 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RepairManifestsAction

4 participants