diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index ac342e2e3a4..5794a4c03c4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -20,14 +20,25 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.TableIdentifier; /** * Any catalog from which to access {@link IcebergTable}s. */ public interface IcebergCatalog { + IcebergTable openTable(String dbName, String tableName); + + default IcebergTable openTable(TableIdentifier tableId) { + // CHALLENGE: clearly better to implement in the reverse direction - `openTable(String, String)` in terms of `openTable(TableIdentifier)` - + // but challenging to do at this point, with multiple derived classes already "in the wild" that implement `openTable(String, String)` + return openTable(tableId.namespace().toString(), tableId.name()); + } + String getCatalogUri(); + void initialize(Map properties, Configuration configuration); + boolean tableAlreadyExists(IcebergTable icebergTable); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index a59fc368812..67a10a42d08 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -63,10 +63,8 @@ @Slf4j @Getter public class IcebergDataset implements PrioritizedCopyableDataset { - private final String dbName; - private final String inputTableName; private final IcebergTable srcIcebergTable; - /** Presumed destination {@link IcebergTable} exists */ + /* CAUTION: *hopefully* `destIcebergTable` exists... although that's not necessarily been verified yet */ private final IcebergTable destIcebergTable; protected final Properties properties; protected final FileSystem sourceFs; @@ -75,9 +73,7 @@ public class IcebergDataset implements PrioritizedCopyableDataset { /** Destination database name */ public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".destination.database"; - public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) { - this.dbName = db; - this.inputTableName = table; + public IcebergDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) { this.srcIcebergTable = srcIcebergTable; this.destIcebergTable = destIcebergTable; this.properties = properties; @@ -117,9 +113,9 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop return createFileSets(targetFs, configuration); } - /** @return unique ID for this dataset, usable as a {@link CopyEntity}.fileset, for atomic publication grouping */ + /** @return unique ID for dataset (based on the source-side table), usable as a {@link CopyEntity#getFileSet}, for atomic publication grouping */ protected String getFileSetId() { - return this.dbName + "." + this.inputTableName; + return this.srcIcebergTable.getTableId().toString(); } /** @@ -127,7 +123,7 @@ protected String getFileSetId() { * comprising the iceberg/table, so as to fully specify remaining table replication. */ protected Iterator> createFileSets(FileSystem targetFs, CopyConfiguration configuration) { - FileSet fileSet = new IcebergTableFileSet(this.getInputTableName(), this, targetFs, configuration); + FileSet fileSet = new IcebergTableFileSet(this.getFileSetId(), this, targetFs, configuration); return Iterators.singletonIterator(fileSet); } @@ -140,7 +136,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati String fileSet = this.getFileSetId(); List copyEntities = Lists.newArrayList(); Map pathToFileStatus = getFilePathsToFileStatus(targetFs, copyConfig); - log.info("~{}.{}~ found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size()); + log.info("~{}~ found {} candidate source paths", fileSet, pathToFileStatus.size()); Configuration defaultHadoopConfiguration = new Configuration(); for (Map.Entry entry : pathToFileStatus.entrySet()) { @@ -165,8 +161,8 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati copyEntities.add(fileEntity); } // TODO: Filter properties specific to iceberg registration and avoid serializing every global property - copyEntities.add(createPostPublishStep(this.dbName, this.inputTableName, this.properties)); - log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size()); + copyEntities.add(createPostPublishStep()); + log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size()); return copyEntities; } @@ -187,8 +183,8 @@ protected Map getFilePathsToFileStatus(FileSystem targetFs, Co IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly(); if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) && isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) { - log.info("~{}.{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", - dbName, inputTableName, currentSnapshotOverview.getSnapshotId(), + log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", + this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), currentSnapshotOverview.getManifestListPath(), currentSnapshotOverview.getMetadataPath().orElse("<>")); return Maps.newHashMap(); @@ -198,7 +194,7 @@ protected Map getFilePathsToFileStatus(FileSystem targetFs, Co Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> { // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()` String manListPath = snapshotInfo.getManifestListPath(); - log.info("~{}.{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName, + log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", this.getFileSetId(), snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<>")); // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data; // most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend @@ -224,18 +220,17 @@ protected Map getFilePathsToFileStatus(FileSystem targetFs, Co missingPaths.addAll(mfi.getListedFilePaths()); } } - log.info("~{}.{}~ snapshot '{}': collected {} additional source paths", - dbName, inputTableName, snapshotInfo.getSnapshotId(), missingPaths.size()); + log.info("~{}~ snapshot '{}': collected {} additional source paths", + this.getFileSetId(), snapshotInfo.getSnapshotId(), missingPaths.size()); return missingPaths.iterator(); } else { - log.info("~{}.{}~ snapshot '{}' already present on target... skipping (including contents)", - dbName, inputTableName, snapshotInfo.getSnapshotId()); + log.info("~{}~ snapshot '{}' already present on target... skipping (including contents)", + this.getFileSetId(), snapshotInfo.getSnapshotId()); // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list Optional metadataPath = snapshotInfo.getMetadataPath(); Optional nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p)); metadataPath.ifPresent(ignore -> - log.info("~{}.{}~ metadata IS {} already present on target", dbName, inputTableName, - nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO") + log.info("~{}~ metadata IS {} already present on target", this.getFileSetId(), nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO") ); return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator()); } @@ -255,7 +250,7 @@ protected Map getFilePathsToFileStatus(FileSystem targetFs, Co try { results.put(path, this.sourceFs.getFileStatus(path)); if (growthTracker.isAnotherMilestone(results.size())) { - log.info("~{}.{}~ collected file status on '{}' source paths", dbName, inputTableName, results.size()); + log.info("~{}~ collected file status on '{}' source paths", this.getFileSetId(), results.size()); } } catch (FileNotFoundException fnfe) { if (!shouldTolerateMissingSourceFiles) { @@ -265,7 +260,7 @@ protected Map getFilePathsToFileStatus(FileSystem targetFs, Co String total = ++numSourceFilesNotFound + " total"; String speculation = "either premature deletion broke time-travel or metadata read interleaved among delete"; errorConsolidator.prepLogMsg(path).ifPresent(msg -> - log.warn("~{}.{}~ source {} ({}... {})", dbName, inputTableName, msg, speculation, total) + log.warn("~{}~ source {} ({}... {})", this.getFileSetId(), msg, speculation, total) ); } } @@ -326,8 +321,8 @@ protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) { return this.destIcebergTable.getDatasetDescriptor(targetFs); } - private PostPublishStep createPostPublishStep(String dbName, String inputTableName, Properties properties) { - IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(dbName, inputTableName, properties); + private PostPublishStep createPostPublishStep() { + IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(this.srcIcebergTable.getTableId(), this.destIcebergTable.getTableId(), this.properties); return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index beded5a723a..dc407f38c81 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -55,22 +55,28 @@ public class IcebergDatasetFinder implements IterableDatasetFinder findDatasets() throws IOException { - List matchingDatasets = new ArrayList<>(); - if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) || StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) { - throw new IllegalArgumentException(String.format("Iceberg database name: {%s} or Iceberg table name: {%s} is missing", - ICEBERG_DB_NAME, ICEBERG_TABLE_NAME)); + String srcDbName = getLocationQualifiedProperty(properties, CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY); + String destDbName = getLocationQualifiedProperty(properties, CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY); + // TODO: eventually remove support for combo (src+dest) iceberg props, in favor of separate source/dest-scoped props; for now, maintain support + if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(destDbName)) { + srcDbName = destDbName = properties.getProperty(ICEBERG_DB_NAME_LEGACY); + } + String srcTableName = getLocationQualifiedProperty(properties, CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY); + String destTableName = getLocationQualifiedProperty(properties, CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY); + // TODO: eventually remove support for combo (src+dest) iceberg props, in favor of separate source/dest-scoped props; for now, maintain support + if (StringUtils.isBlank(srcTableName) || StringUtils.isBlank(destTableName)) { + srcTableName = destTableName = properties.getProperty(ICEBERG_TABLE_NAME_LEGACY); + } + if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(srcTableName)) { + throw new IllegalArgumentException( + String.format("Missing (at least some) IcebergDataset properties - source: ('%s' and '%s') and destination: ('%s' and '%s') " + + "or [deprecated!] common/combo: ('%s' and '%s')", + calcLocationQualifiedPropName(CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY), + calcLocationQualifiedPropName(CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY), + calcLocationQualifiedPropName(CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY), + calcLocationQualifiedPropName(CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY), + ICEBERG_DB_NAME_LEGACY, + ICEBERG_TABLE_NAME_LEGACY)); } - String dbName = properties.getProperty(ICEBERG_DB_NAME); - String tblName = properties.getProperty(ICEBERG_TABLE_NAME); - IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); - IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION); + IcebergCatalog srcIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); + IcebergCatalog destIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION); + /* Each Iceberg dataset maps to an Iceberg table */ - matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, this.properties, this.sourceFs)); - log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), - matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one + List matchingDatasets = new ArrayList<>(); + matchingDatasets.add(createIcebergDataset(srcIcebergCatalog, srcDbName, srcTableName, destIcebergCatalog, destDbName, destTableName, this.properties, this.sourceFs)); + log.info("Found {} matching datasets: {} for the (source) '{}.{}' / (dest) '{}.{}'", matchingDatasets.size(), + matchingDatasets, srcDbName, srcTableName, destDbName, destTableName); // until future support added to specify multiple icebergs, count expected always to be one return matchingDatasets; } @@ -114,28 +138,39 @@ public Iterator getDatasetsIterator() throws IOException { } /** - * Requires both source and destination catalogs to connect to their respective {@link IcebergTable} + * Uses each source and destination {@link IcebergCatalog} to load and verify existence of the respective {@link IcebergTable} * Note: the destination side {@link IcebergTable} should be present before initiating replication + * * @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable} */ - protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { - IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); - Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName)); - IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); + protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog destinationIcebergCatalog, String destDbName, String destTableName, Properties properties, FileSystem fs) throws IOException { + IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, srcTableName); + Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, srcTableName)); + IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName); // TODO: Rethink strategy to enforce dest iceberg table - Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName)); - return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); + Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName)); + return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, fs); } protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { - String prefix = location.getConfigPrefix(); - Map catalogProperties = buildMapFromPrefixChildren(properties, prefix); + String catalogPrefix = calcLocationQualifiedPropName(location, ICEBERG_CATALOG_KEY + "."); + Map catalogProperties = buildMapFromPrefixChildren(properties, catalogPrefix); // TODO: Filter properties specific to Hadoop Configuration configuration = HadoopUtils.getConfFromProperties(properties); String icebergCatalogClassName = catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } + /** @return property value or `null` */ + protected static String getLocationQualifiedProperty(Properties properties, CatalogLocation location, String relativePropName) { + return properties.getProperty(calcLocationQualifiedPropName(location, relativePropName)); + } + + /** @return absolute (`location`-qualified) property name for `relativePropName` */ + protected static String calcLocationQualifiedPropName(CatalogLocation location, String relativePropName) { + return location.getConfigPrefix() + relativePropName; + } + /** * Filters the properties based on a prefix using {@link ConfigBuilder#loadProps(Properties, String)} and creates a {@link Map} */ diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java index 8f32f8cc039..8beb72e66d0 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java @@ -21,23 +21,30 @@ import java.util.Properties; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.commit.CommitStep; + /** * {@link CommitStep} to perform Iceberg registration. */ @Slf4j -@AllArgsConstructor public class IcebergRegisterStep implements CommitStep { - private final String dbName; - private final String tblName; + // store as string for serializability... TODO: explore whether truly necessary (or we could just as well store as `TableIdentifier`) + private final String srcTableIdStr; + private final String destTableIdStr; private final Properties properties; + public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier destTableId, Properties properties) { + this.srcTableIdStr = srcTableId.toString(); + this.destTableIdStr = destTableId.toString(); + this.properties = properties; + } + @Override public boolean isCompleted() throws IOException { return false; @@ -46,12 +53,12 @@ public boolean isCompleted() throws IOException { @Override public void execute() throws IOException { IcebergTable srcIcebergTable = IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.SOURCE) - .openTable(this.dbName, this.tblName); + .openTable(TableIdentifier.parse(srcTableIdStr)); IcebergTable destIcebergTable = IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION) - .openTable(this.dbName, this.tblName); + .openTable(TableIdentifier.parse(destTableIdStr)); TableMetadata destinationMetadata = null; try { - destinationMetadata = destIcebergTable.accessTableMetadata(); + destinationMetadata = destIcebergTable.accessTableMetadata(); // probe... (first access could throw) } catch (IcebergTable.TableNotFoundException tnfe) { log.warn("Destination TableMetadata doesn't exist because: " , tnfe); } @@ -59,6 +66,6 @@ public void execute() throws IOException { } @Override public String toString() { - return String.format("Registering Iceberg Table: {%s}.{%s} ", this.dbName, this.tblName); + return String.format("Registering Iceberg Table: {%s} (dest); (src: {%s})", this.destTableIdStr, this.srcTableIdStr); } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index 09238445c68..0b485e1df2d 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -193,17 +193,18 @@ public void testGetFilePathsWhenAllAtDest() throws IOException { validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths); // ensure short-circuiting was able to avert iceberg manifests scan Mockito.verify(mockTable, Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly(); + Mockito.verify(mockTable, Mockito.times(1)).getTableId(); Mockito.verifyNoMoreInteractions(mockTable); } /** Exception wrapping is used internally--ensure that doesn't lapse into silently swallowing errors */ @Test(expectedExceptions = IOException.class) public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOException { - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Lists.newArrayList(SNAPSHOT_PATHS_0)); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), Lists.newArrayList(SNAPSHOT_PATHS_0)); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, null, new Properties(), sourceFs); + IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destFsBuilder.build(); @@ -241,10 +242,10 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); - IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); + TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, testTblName); + IcebergTable srcIcebergTbl = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTbl = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl, destIcebergTbl, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -267,10 +268,10 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); - IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); + TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, testTblName); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -298,9 +299,10 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); + TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, testTblName); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -326,9 +328,10 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); + TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, testTblName); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -358,13 +361,12 @@ protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, Optional> optExistingSourcePaths, List existingDestPaths, Set expectedResultPaths) throws IOException { - IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), sourceSnapshotPathSets); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = - new IcebergDataset(testDbName, testTblName, icebergTable, null, new Properties(), sourceFs); + IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); destFsBuilder.addPaths(existingDestPaths); @@ -377,7 +379,7 @@ protected IcebergTable validateGetFilePathsGivenDestState(List snapshotPathSets) throws IOException { + public static IcebergTable withSnapshots(TableIdentifier tableId, List snapshotPathSets) throws IOException { IcebergTable table = Mockito.mock(IcebergTable.class); + Mockito.when(table.getTableId()).thenReturn(tableId); int lastIndex = snapshotPathSets.size() - 1; Mockito.when(table.getCurrentSnapshotInfoOverviewOnly()) .thenReturn(snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));