4141import org .apache .iceberg .relocated .com .google .common .collect .Lists ;
4242import org .apache .iceberg .relocated .com .google .common .collect .Sets ;
4343import org .apache .iceberg .util .CharSequenceSet ;
44+ import org .apache .iceberg .util .Pair ;
4445import org .slf4j .Logger ;
4546import org .slf4j .LoggerFactory ;
4647
@@ -62,6 +63,9 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
6263 ImmutableSet .of (DataOperations .OVERWRITE , DataOperations .REPLACE , DataOperations .DELETE );
6364 private static final Set <String > VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS =
6465 ImmutableSet .of (DataOperations .OVERWRITE , DataOperations .REPLACE );
66+ // delete files are only added in "overwrite" operations
67+ private static final Set <String > VALIDATE_REPLACED_DATA_FILES_OPERATIONS =
68+ ImmutableSet .of (DataOperations .OVERWRITE );
6569
6670 private final String tableName ;
6771 private final TableOperations ops ;
@@ -253,28 +257,10 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
253257 return ;
254258 }
255259
256- List <ManifestFile > manifests = Lists .newArrayList ();
257- Set <Long > newSnapshots = Sets .newHashSet ();
258-
259- Long currentSnapshotId = base .currentSnapshot ().snapshotId ();
260- while (currentSnapshotId != null && !currentSnapshotId .equals (startingSnapshotId )) {
261- Snapshot currentSnapshot = ops .current ().snapshot (currentSnapshotId );
262-
263- ValidationException .check (currentSnapshot != null ,
264- "Cannot determine history between starting snapshot %s and current %s" ,
265- startingSnapshotId , currentSnapshotId );
266-
267- if (VALIDATE_ADDED_FILES_OPERATIONS .contains (currentSnapshot .operation ())) {
268- newSnapshots .add (currentSnapshotId );
269- for (ManifestFile manifest : currentSnapshot .dataManifests ()) {
270- if (manifest .snapshotId () == (long ) currentSnapshotId ) {
271- manifests .add (manifest );
272- }
273- }
274- }
275-
276- currentSnapshotId = currentSnapshot .parentId ();
277- }
260+ Pair <List <ManifestFile >, Set <Long >> history =
261+ validationHistory (base , startingSnapshotId , VALIDATE_ADDED_FILES_OPERATIONS , ManifestContent .DATA );
262+ List <ManifestFile > manifests = history .first ();
263+ Set <Long > newSnapshots = history .second ();
278264
279265 ManifestGroup conflictGroup = new ManifestGroup (ops .io (), manifests , ImmutableList .of ())
280266 .caseSensitive (caseSensitive )
@@ -297,6 +283,38 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
297283 }
298284 }
299285
286+ /**
287+ * Validates that no new delete files have been added to the table since a starting snapshot.
288+ *
289+ * @param base table metadata to validate
290+ * @param startingSnapshotId id of the snapshot current at the start of the operation
291+ * @param dataFiles data files to validate have no new row deletes
292+ */
293+ protected void validateNoNewDeletesForDataFiles (TableMetadata base , Long startingSnapshotId ,
294+ Iterable <DataFile > dataFiles ) {
295+ // if there is no current table state, no files have been added
296+ if (base .currentSnapshot () == null ) {
297+ return ;
298+ }
299+
300+ Pair <List <ManifestFile >, Set <Long >> history =
301+ validationHistory (base , startingSnapshotId , VALIDATE_REPLACED_DATA_FILES_OPERATIONS , ManifestContent .DELETES );
302+ List <ManifestFile > deleteManifests = history .first ();
303+
304+ long startingSequenceNumber = startingSnapshotId == null ? 0 : base .snapshot (startingSnapshotId ).sequenceNumber ();
305+ DeleteFileIndex deletes = DeleteFileIndex .builderFor (ops .io (), deleteManifests )
306+ .afterSequenceNumber (startingSequenceNumber )
307+ .specsById (ops .current ().specsById ())
308+ .build ();
309+
310+ for (DataFile dataFile : dataFiles ) {
311+ // if any delete is found that applies to files written in or before the starting snapshot, fail
312+ if (deletes .forDataFile (startingSequenceNumber , dataFile ).length > 0 ) {
313+ throw new ValidationException ("Cannot commit, found new delete for replaced data file: %s" , dataFile );
314+ }
315+ }
316+ }
317+
300318 @ SuppressWarnings ("CollectionUndefinedEquality" )
301319 protected void validateDataFilesExist (TableMetadata base , Long startingSnapshotId ,
302320 CharSequenceSet requiredDataFiles , boolean skipDeletes ) {
@@ -309,6 +327,31 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI
309327 VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS :
310328 VALIDATE_DATA_FILES_EXIST_OPERATIONS ;
311329
330+ Pair <List <ManifestFile >, Set <Long >> history =
331+ validationHistory (base , startingSnapshotId , matchingOperations , ManifestContent .DATA );
332+ List <ManifestFile > manifests = history .first ();
333+ Set <Long > newSnapshots = history .second ();
334+
335+ ManifestGroup matchingDeletesGroup = new ManifestGroup (ops .io (), manifests , ImmutableList .of ())
336+ .filterManifestEntries (entry -> entry .status () != ManifestEntry .Status .ADDED &&
337+ newSnapshots .contains (entry .snapshotId ()) && requiredDataFiles .contains (entry .file ().path ()))
338+ .specsById (base .specsById ())
339+ .ignoreExisting ();
340+
341+ try (CloseableIterator <ManifestEntry <DataFile >> deletes = matchingDeletesGroup .entries ().iterator ()) {
342+ if (deletes .hasNext ()) {
343+ throw new ValidationException ("Cannot commit, missing data files: %s" ,
344+ Iterators .toString (Iterators .transform (deletes , entry -> entry .file ().path ().toString ())));
345+ }
346+
347+ } catch (IOException e ) {
348+ throw new UncheckedIOException ("Failed to validate required files exist" , e );
349+ }
350+ }
351+
352+ private Pair <List <ManifestFile >, Set <Long >> validationHistory (TableMetadata base , Long startingSnapshotId ,
353+ Set <String > matchingOperations ,
354+ ManifestContent content ) {
312355 List <ManifestFile > manifests = Lists .newArrayList ();
313356 Set <Long > newSnapshots = Sets .newHashSet ();
314357
@@ -322,31 +365,25 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI
322365
323366 if (matchingOperations .contains (currentSnapshot .operation ())) {
324367 newSnapshots .add (currentSnapshotId );
325- for (ManifestFile manifest : currentSnapshot .dataManifests ()) {
326- if (manifest .snapshotId () == (long ) currentSnapshotId ) {
327- manifests .add (manifest );
368+ if (content == ManifestContent .DATA ) {
369+ for (ManifestFile manifest : currentSnapshot .dataManifests ()) {
370+ if (manifest .snapshotId () == (long ) currentSnapshotId ) {
371+ manifests .add (manifest );
372+ }
373+ }
374+ } else {
375+ for (ManifestFile manifest : currentSnapshot .deleteManifests ()) {
376+ if (manifest .snapshotId () == (long ) currentSnapshotId ) {
377+ manifests .add (manifest );
378+ }
328379 }
329380 }
330381 }
331382
332383 currentSnapshotId = currentSnapshot .parentId ();
333384 }
334385
335- ManifestGroup matchingDeletesGroup = new ManifestGroup (ops .io (), manifests , ImmutableList .of ())
336- .filterManifestEntries (entry -> entry .status () != ManifestEntry .Status .ADDED &&
337- newSnapshots .contains (entry .snapshotId ()) && requiredDataFiles .contains (entry .file ().path ()))
338- .specsById (base .specsById ())
339- .ignoreExisting ();
340-
341- try (CloseableIterator <ManifestEntry <DataFile >> deletes = matchingDeletesGroup .entries ().iterator ()) {
342- if (deletes .hasNext ()) {
343- throw new ValidationException ("Cannot commit, missing data files: %s" ,
344- Iterators .toString (Iterators .transform (deletes , entry -> entry .file ().path ().toString ())));
345- }
346-
347- } catch (IOException e ) {
348- throw new UncheckedIOException ("Failed to validate required files exist" , e );
349- }
386+ return Pair .of (manifests , newSnapshots );
350387 }
351388
352389 @ Override
0 commit comments