Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.gobblin.dataset.DatasetConstants;

/**
* Base implementation of {@link IcebergCatalog} to access {@link IcebergTable} and the
Expand All @@ -42,7 +43,7 @@ protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> compan
@Override
public IcebergTable openTable(String dbName, String tableName) {
TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), createTableOperations(tableId), this.getCatalogUri());
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), createTableOperations(tableId), this.getCatalogUri());
}

protected Catalog createCompanionCatalog(Map<String, String> properties, Configuration configuration) {
Expand All @@ -57,5 +58,13 @@ protected String calcDatasetDescriptorName(TableIdentifier tableId) {
return tableId.toString(); // default to FQ ID with both table namespace and name
}

/**
* Enable catalog-specific naming for charting lineage, etc. This default impl gives {@link DatasetConstants#PLATFORM_ICEBERG}
* @return the {@link org.apache.gobblin.dataset.DatasetDescriptor#getPlatform()} to use for tables from this catalog
*/
protected String getDatasetDescriptorPlatform() {
return DatasetConstants.PLATFORM_ICEBERG;
}

protected abstract TableOperations createTableOperations(TableIdentifier tableId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,16 @@ public TableNotFoundException(TableIdentifier tableId) {

@Getter
private final TableIdentifier tableId;
/** allow the {@link IcebergCatalog} creating this table to qualify its name when used for lineage, etc. */
/** allow the {@link IcebergCatalog} creating this table to qualify its {@link DatasetDescriptor#getName()} used for lineage, etc. */
private final String datasetDescriptorName;
/** allow the {@link IcebergCatalog} creating this table to specify the {@link DatasetDescriptor#getPlatform()} used for lineage, etc. */
private final String datasetDescriptorPlatform;
private final TableOperations tableOps;
private final String catalogUri;

@VisibleForTesting
IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri) {
this(tableId, tableId.toString(), tableOps, catalogUri);
this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri);
}

/** @return metadata info limited to the most recent (current) snapshot */
Expand Down Expand Up @@ -194,7 +196,7 @@ protected static List<String> discoverDataFilePaths(ManifestFile manifest, FileI

public DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
DatasetDescriptor descriptor = new DatasetDescriptor(
DatasetConstants.PLATFORM_ICEBERG,
datasetDescriptorPlatform,
URI.create(this.catalogUri),
this.datasetDescriptorName
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ public void setUp() throws Exception {
public void testGetDatasetDescriptor() throws URISyntaxException {
TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName);
String qualifiedTableName = "foo_prefix." + tableId.toString();
IcebergTable table = new IcebergTable(tableId, qualifiedTableName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
String platformName = "Floe";
IcebergTable table = new IcebergTable(tableId, qualifiedTableName, platformName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
FileSystem mockFs = Mockito.mock(FileSystem.class);
Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI);
DatasetDescriptor expected = new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, URI.create(SRC_CATALOG_URI), qualifiedTableName);
DatasetDescriptor expected = new DatasetDescriptor(platformName, URI.create(SRC_CATALOG_URI), qualifiedTableName);
expected.addMetadata(DatasetConstants.FS_URI, SRC_FS_URI.toString());
Assert.assertEquals(table.getDatasetDescriptor(mockFs), expected);
}
Expand Down