diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 43476d21daed..b598bcb19bd8 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -198,12 +198,16 @@ private Dataset buildManifestEntryDF(List manifests) { loadMetadataTable(table, ENTRIES) .filter("status < 2") // select only live entries .selectExpr( - "input_file_name() as manifest", "snapshot_id", "sequence_number", "data_file"); + "input_file_name() as manifest", + "snapshot_id", + "sequence_number", + "file_sequence_number", + "data_file"); Column joinCond = manifestDF.col("manifest").equalTo(manifestEntryDF.col("manifest")); return manifestEntryDF .join(manifestDF, joinCond, "left_semi") - .select("snapshot_id", "sequence_number", "data_file"); + .select("snapshot_id", "sequence_number", "file_sequence_number", "data_file"); } private List writeManifestsForUnpartitionedTable( @@ -374,8 +378,9 @@ private static ManifestFile writeManifest( Row row = rows.get(index); long snapshotId = row.getLong(0); long sequenceNumber = row.getLong(1); - Row file = row.getStruct(2); - writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber); + Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2); + Row file = row.getStruct(3); + writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber); } } finally { writer.close(); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java new file mode 100644 index 000000000000..70ab04f0a080 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.assertj.core.api.Assertions; + +public class ValidationHelpers { + + private ValidationHelpers() {} + + public static List dataSeqs(Long... seqs) { + return Arrays.asList(seqs); + } + + public static List fileSeqs(Long... seqs) { + return Arrays.asList(seqs); + } + + public static List snapshotIds(Long... ids) { + return Arrays.asList(ids); + } + + public static List files(ContentFile... files) { + return Arrays.stream(files).map(file -> file.path().toString()).collect(Collectors.toList()); + } + + public static void validateDataManifest( + Table table, + ManifestFile manifest, + List dataSeqs, + List fileSeqs, + List snapshotIds, + List files) { + + List actualDataSeqs = Lists.newArrayList(); + List actualFileSeqs = Lists.newArrayList(); + List actualSnapshotIds = Lists.newArrayList(); + List actualFiles = Lists.newArrayList(); + + for (ManifestEntry entry : ManifestFiles.read(manifest, table.io()).entries()) { + actualDataSeqs.add(entry.dataSequenceNumber()); + actualFileSeqs.add(entry.fileSequenceNumber()); + actualSnapshotIds.add(entry.snapshotId()); + actualFiles.add(entry.file().path().toString()); + } + + assertSameElements("data seqs", actualDataSeqs, dataSeqs); + assertSameElements("file seqs", actualFileSeqs, fileSeqs); + assertSameElements("snapshot IDs", actualSnapshotIds, snapshotIds); + assertSameElements("files", actualFiles, files); + } + + private static void assertSameElements(String context, List actual, List expected) { + String errorMessage = String.format("%s must match", context); + Assertions.assertThat(actual).as(errorMessage).hasSameElementsAs(expected); + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 4b50ea0c29f3..95f2f12d5ff8 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -18,6 +18,11 @@ */ package org.apache.iceberg.spark.actions; +import static org.apache.iceberg.ValidationHelpers.dataSeqs; +import static org.apache.iceberg.ValidationHelpers.fileSeqs; +import static org.apache.iceberg.ValidationHelpers.files; +import static org.apache.iceberg.ValidationHelpers.snapshotIds; +import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -29,6 +34,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -38,6 +44,7 @@ import org.apache.iceberg.actions.RewriteManifests; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -471,6 +478,67 @@ public void testRewriteManifestsWithPredicate() throws IOException { Assert.assertEquals("Rows must match", expectedRecords, actualRecords); } + @Test + public void testRewriteSmallManifestsNonPartitionedV2Table() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"); + Table table = TABLES.create(SCHEMA, spec, properties, tableLocation); + + List records1 = Lists.newArrayList(new ThreeColumnRecord(1, null, "AAAA")); + writeRecords(records1); + + table.refresh(); + + Snapshot snapshot1 = table.currentSnapshot(); + DataFile file1 = Iterables.getOnlyElement(snapshot1.addedDataFiles(table.io())); + + List records2 = Lists.newArrayList(new ThreeColumnRecord(2, "CCCC", "CCCC")); + writeRecords(records2); + + table.refresh(); + + Snapshot snapshot2 = table.currentSnapshot(); + DataFile file2 = Iterables.getOnlyElement(snapshot2.addedDataFiles(table.io())); + + List manifests = table.currentSnapshot().allManifests(table.io()); + Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); + + SparkActions actions = SparkActions.get(); + RewriteManifests.Result result = actions.rewriteManifests(table).execute(); + Assert.assertEquals( + "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); + Assert.assertEquals( + "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + + table.refresh(); + + List newManifests = table.currentSnapshot().allManifests(table.io()); + Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size()); + + ManifestFile newManifest = Iterables.getOnlyElement(newManifests); + Assert.assertEquals(2, (long) newManifest.existingFilesCount()); + Assert.assertFalse(newManifest.hasAddedFiles()); + Assert.assertFalse(newManifest.hasDeletedFiles()); + + validateDataManifest( + table, + newManifest, + dataSeqs(1L, 2L), + fileSeqs(1L, 2L), + snapshotIds(snapshot1.snapshotId(), snapshot2.snapshotId()), + files(file1, file2)); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = + resultDF.sort("c1", "c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + private void writeRecords(List records) { Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); writeDF(df);