Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,25 @@
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.TableIdentifier;


/**
* Any catalog from which to access {@link IcebergTable}s.
*/
public interface IcebergCatalog {

IcebergTable openTable(String dbName, String tableName);

default IcebergTable openTable(TableIdentifier tableId) {
// CHALLENGE: clearly better to implement in the reverse direction - `openTable(String, String)` in terms of `openTable(TableIdentifier)` -
// but challenging to do at this point, with multiple derived classes already "in the wild" that implement `openTable(String, String)`
return openTable(tableId.namespace().toString(), tableId.name());
}

String getCatalogUri();

void initialize(Map<String, String> properties, Configuration configuration);

boolean tableAlreadyExists(IcebergTable icebergTable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@
@Slf4j
@Getter
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
private final IcebergTable srcIcebergTable;
/** Presumed destination {@link IcebergTable} exists */
/* CAUTION: *hopefully* `destIcebergTable` exists... although that's not necessarily been verified yet */

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be out of scope for this PR, but shouldn't our default behavior not have that assumption and create the table if missing? That's the contract for Hive distcp

@phet phet Nov 22, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some catalogs, like openhouse, that's not actually possible: the tables must have been already mated in a way that we are unable to accomplish ourselves (i.e. from outside the catalog itself)

private final IcebergTable destIcebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
Expand All @@ -75,9 +73,7 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
/** Destination database name */
public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".destination.database";

public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
this.dbName = db;
this.inputTableName = table;
public IcebergDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
this.srcIcebergTable = srcIcebergTable;
this.destIcebergTable = destIcebergTable;
this.properties = properties;
Expand Down Expand Up @@ -117,17 +113,17 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
return createFileSets(targetFs, configuration);
}

/** @return unique ID for this dataset, usable as a {@link CopyEntity}.fileset, for atomic publication grouping */
/** @return unique ID for dataset (based on the source-side table), usable as a {@link CopyEntity#getFileSet}, for atomic publication grouping */
protected String getFileSetId() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I feel like this is better encapsulated as getSourceTableId because fileSetId() makes me think about some subset of the table, not just the src table.

@phet phet Nov 22, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name originally arose because it's the value provided to the CopyableFile.Builder.fileSet setter - https://github.com/apache/gobblin/pull/3835/files#diff-da31b49b793e970e5d99d51f52c63d592a6b7dfd8b18d8cc988cd52a9521d478R155

does it make more sense in context, or do you still recommend to change?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I guess we can keep it the same here for consistency, just that in this usecase it really is referring to the table when this is a table distcp

return this.dbName + "." + this.inputTableName;
return this.srcIcebergTable.getTableId().toString();
}

/**
* Generates {@link FileSet}s, being themselves able to generate {@link CopyEntity}s for all files, data and metadata,
* comprising the iceberg/table, so as to fully specify remaining table replication.
*/
protected Iterator<FileSet<CopyEntity>> createFileSets(FileSystem targetFs, CopyConfiguration configuration) {
FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getInputTableName(), this, targetFs, configuration);
FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getFileSetId(), this, targetFs, configuration);
return Iterators.singletonIterator(fileSet);
}

Expand All @@ -140,7 +136,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
String fileSet = this.getFileSetId();
List<CopyEntity> copyEntities = Lists.newArrayList();
Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus(targetFs, copyConfig);
log.info("~{}.{}~ found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
log.info("~{}~ found {} candidate source paths", fileSet, pathToFileStatus.size());

Configuration defaultHadoopConfiguration = new Configuration();
for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
Expand All @@ -165,8 +161,8 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
copyEntities.add(fileEntity);
}
// TODO: Filter properties specific to iceberg registration and avoid serializing every global property
copyEntities.add(createPostPublishStep(this.dbName, this.inputTableName, this.properties));
log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size());
copyEntities.add(createPostPublishStep());
log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
return copyEntities;
}

Expand All @@ -187,8 +183,8 @@ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, Co
IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) &&
isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
log.info("~{}.{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
return Maps.newHashMap();
Expand All @@ -198,7 +194,7 @@ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, Co
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
// log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
String manListPath = snapshotInfo.getManifestListPath();
log.info("~{}.{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", this.getFileSetId(),
snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
// ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
// most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend
Expand All @@ -224,18 +220,17 @@ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, Co
missingPaths.addAll(mfi.getListedFilePaths());
}
}
log.info("~{}.{}~ snapshot '{}': collected {} additional source paths",
dbName, inputTableName, snapshotInfo.getSnapshotId(), missingPaths.size());
log.info("~{}~ snapshot '{}': collected {} additional source paths",
this.getFileSetId(), snapshotInfo.getSnapshotId(), missingPaths.size());
return missingPaths.iterator();
} else {
log.info("~{}.{}~ snapshot '{}' already present on target... skipping (including contents)",
dbName, inputTableName, snapshotInfo.getSnapshotId());
log.info("~{}~ snapshot '{}' already present on target... skipping (including contents)",
this.getFileSetId(), snapshotInfo.getSnapshotId());
// IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
Optional<String> metadataPath = snapshotInfo.getMetadataPath();
Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p));
metadataPath.ifPresent(ignore ->
log.info("~{}.{}~ metadata IS {} already present on target", dbName, inputTableName,
nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
log.info("~{}~ metadata IS {} already present on target", this.getFileSetId(), nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
);
return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
}
Expand All @@ -255,7 +250,7 @@ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, Co
try {
results.put(path, this.sourceFs.getFileStatus(path));
if (growthTracker.isAnotherMilestone(results.size())) {
log.info("~{}.{}~ collected file status on '{}' source paths", dbName, inputTableName, results.size());
log.info("~{}~ collected file status on '{}' source paths", this.getFileSetId(), results.size());
}
} catch (FileNotFoundException fnfe) {
if (!shouldTolerateMissingSourceFiles) {
Expand All @@ -265,7 +260,7 @@ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, Co
String total = ++numSourceFilesNotFound + " total";
String speculation = "either premature deletion broke time-travel or metadata read interleaved among delete";
errorConsolidator.prepLogMsg(path).ifPresent(msg ->
log.warn("~{}.{}~ source {} ({}... {})", dbName, inputTableName, msg, speculation, total)
log.warn("~{}~ source {} ({}... {})", this.getFileSetId(), msg, speculation, total)
);
}
}
Expand Down Expand Up @@ -326,8 +321,8 @@ protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
return this.destIcebergTable.getDatasetDescriptor(targetFs);
}

private PostPublishStep createPostPublishStep(String dbName, String inputTableName, Properties properties) {
IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(dbName, inputTableName, properties);
private PostPublishStep createPostPublishStep() {
IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(this.srcIcebergTable.getTableId(), this.destIcebergTable.getTableId(), this.properties);
return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,28 @@ public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDatase
public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
public static final String ICEBERG_CATALOG_KEY = "catalog";
/**
* This is used with a prefix: "{@link IcebergDatasetFinder#ICEBERG_DATASET_PREFIX}" + "." + "(source or destination)" + "." + "{@link IcebergDatasetFinder#ICEBERG_CATALOG_KEY}" + "..."
* It is an open-ended pattern used to pass arbitrary catalog specific properties
* This is used with a prefix: "{@link IcebergDatasetFinder#ICEBERG_DATASET_PREFIX}" + "." + "( source | destination )" + "." + "{@link IcebergDatasetFinder#ICEBERG_CATALOG_KEY}" + "..."
* It is an open-ended pattern used to pass arbitrary catalog-scoped properties
*/
public static final String ICEBERG_CATALOG_CLASS_KEY = "class";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";
public static final String ICEBERG_DB_NAME_KEY = "database.name";
public static final String ICEBERG_TABLE_NAME_KEY = "table.name";
/** please use source/dest-scoped properties */
@Deprecated
public static final String ICEBERG_DB_NAME_LEGACY = ICEBERG_DATASET_PREFIX + "." + ICEBERG_DB_NAME_KEY;
/** please use source/dest-scoped properties */
@Deprecated
public static final String ICEBERG_TABLE_NAME_LEGACY = ICEBERG_DATASET_PREFIX + "." + ICEBERG_TABLE_NAME_KEY;

public enum CatalogLocation {
SOURCE,
DESTINATION;

/**
* Provides prefix for configs based on the catalog location to filter catalog specific properties
* Provides prefix for configs based on the catalog orientation (source or destination) for catalog-targeted properties
*/
public String getConfigPrefix() {
return ICEBERG_DATASET_PREFIX + "." + this.toString().toLowerCase() + "." + ICEBERG_CATALOG_KEY + ".";
return ICEBERG_DATASET_PREFIX + "." + this.toString().toLowerCase() + ".";
}
}

Expand All @@ -86,20 +92,38 @@ public String getConfigPrefix() {
*/
@Override
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) || StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
throw new IllegalArgumentException(String.format("Iceberg database name: {%s} or Iceberg table name: {%s} is missing",
ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
String srcDbName = getLocationQualifiedProperty(properties, CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY);
String destDbName = getLocationQualifiedProperty(properties, CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY);
// TODO: eventually remove support for combo (src+dest) iceberg props, in favor of separate source/dest-scoped props; for now, maintain support
if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(destDbName)) {
srcDbName = destDbName = properties.getProperty(ICEBERG_DB_NAME_LEGACY);
}
String srcTableName = getLocationQualifiedProperty(properties, CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY);
String destTableName = getLocationQualifiedProperty(properties, CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY);
// TODO: eventually remove support for combo (src+dest) iceberg props, in favor of separate source/dest-scoped props; for now, maintain support
if (StringUtils.isBlank(srcTableName) || StringUtils.isBlank(destTableName)) {
srcTableName = destTableName = properties.getProperty(ICEBERG_TABLE_NAME_LEGACY);
}
if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(srcTableName)) {
throw new IllegalArgumentException(
String.format("Missing (at least some) IcebergDataset properties - source: ('%s' and '%s') and destination: ('%s' and '%s') "
+ "or [deprecated!] common/combo: ('%s' and '%s')",
calcLocationQualifiedPropName(CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY),
calcLocationQualifiedPropName(CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY),
calcLocationQualifiedPropName(CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY),
calcLocationQualifiedPropName(CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY),
ICEBERG_DB_NAME_LEGACY,
ICEBERG_TABLE_NAME_LEGACY));
}
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);

IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
IcebergCatalog srcIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
IcebergCatalog destIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);

/* Each Iceberg dataset maps to an Iceberg table */
matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, this.properties, this.sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one
List<IcebergDataset> matchingDatasets = new ArrayList<>();
matchingDatasets.add(createIcebergDataset(srcIcebergCatalog, srcDbName, srcTableName, destIcebergCatalog, destDbName, destTableName, this.properties, this.sourceFs));
log.info("Found {} matching datasets: {} for the (source) '{}.{}' / (dest) '{}.{}'", matchingDatasets.size(),
matchingDatasets, srcDbName, srcTableName, destDbName, destTableName); // until future support added to specify multiple icebergs, count expected always to be one
return matchingDatasets;
}

Expand All @@ -114,28 +138,39 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
}

/**
* Requires both source and destination catalogs to connect to their respective {@link IcebergTable}
* Uses each source and destination {@link IcebergCatalog} to load and verify existence of the respective {@link IcebergTable}
* Note: the destination side {@link IcebergTable} should be present before initiating replication
*
* @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable}
*/
protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException {
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName);
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName);
protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog destinationIcebergCatalog, String destDbName, String destTableName, Properties properties, FileSystem fs) throws IOException {
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, srcTableName);
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, srcTableName));
IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName);
// TODO: Rethink strategy to enforce dest iceberg table
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName));
return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName));
return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, fs);
}

protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
String prefix = location.getConfigPrefix();
Map<String, String> catalogProperties = buildMapFromPrefixChildren(properties, prefix);
String catalogPrefix = calcLocationQualifiedPropName(location, ICEBERG_CATALOG_KEY + ".");
Map<String, String> catalogProperties = buildMapFromPrefixChildren(properties, catalogPrefix);
// TODO: Filter properties specific to Hadoop
Configuration configuration = HadoopUtils.getConfFromProperties(properties);
String icebergCatalogClassName = catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
}

/** @return property value or `null` */
protected static String getLocationQualifiedProperty(Properties properties, CatalogLocation location, String relativePropName) {
return properties.getProperty(calcLocationQualifiedPropName(location, relativePropName));
}

/** @return absolute (`location`-qualified) property name for `relativePropName` */
protected static String calcLocationQualifiedPropName(CatalogLocation location, String relativePropName) {
return location.getConfigPrefix() + relativePropName;
}

/**
* Filters the properties based on a prefix using {@link ConfigBuilder#loadProps(Properties, String)} and creates a {@link Map}
*/
Expand Down
Loading