diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 7ac194ffda..4722f92eb6 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -99,7 +99,6 @@ import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.pagination.PageToken; -import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; @@ -221,50 +220,12 @@ public Map getConfigOverrides() { private PolarisAdminService adminService; private PolarisEntityManager entityManager; private FileIOFactory fileIOFactory; + private InMemoryFileIO fileIO; private PolarisEntity catalogEntity; private SecurityContext securityContext; private TestPolarisEventListener testPolarisEventListener; private ReservedProperties reservedProperties; - /** - * A subclass of IcebergCatalog that adds FileIO management capabilities. This allows the file IO - * logic to be encapsulated in a dedicated class. - */ - public static class IcebergFileIOCatalog extends IcebergCatalog { - - public IcebergFileIOCatalog( - PolarisEntityManager entityManager, - PolarisMetaStoreManager metaStoreManager, - CallContext callContext, - PolarisResolutionManifestCatalogView resolvedEntityView, - SecurityContext securityContext, - TaskExecutor taskExecutor, - FileIOFactory fileIOFactory, - PolarisEventListener polarisEventListener) { - super( - entityManager, - metaStoreManager, - callContext, - resolvedEntityView, - securityContext, - taskExecutor, - fileIOFactory, - polarisEventListener); - } - - @Override - public synchronized FileIO getIo() { - if (catalogFileIO == null) { - catalogFileIO = loadFileIO(ioImplClassName, tableDefaultProperties); - if (closeableGroup != null) { - closeableGroup.addCloseable(catalogFileIO); - } - } - - return catalogFileIO; - } - } - @BeforeAll public static void setUpMocks() { PolarisStorageIntegrationProviderImpl mock = @@ -410,7 +371,7 @@ protected IcebergCatalog initCatalog( callContext, entityManager, securityContext, CATALOG_NAME); TaskExecutor taskExecutor = Mockito.mock(); IcebergCatalog icebergCatalog = - new IcebergFileIOCatalog( + new IcebergCatalog( entityManager, metaStoreManager, callContext, @@ -419,6 +380,8 @@ protected IcebergCatalog initCatalog( taskExecutor, fileIOFactory, polarisEventListener); + fileIO = new InMemoryFileIO(); + icebergCatalog.setCatalogFileIo(fileIO); ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO") @@ -630,7 +593,6 @@ public void testValidateNotificationWhenTableAndNamespacesDontExist() { // Now also check that despite creating the metadata file, the validation call still doesn't // create any namespaces or tables. - InMemoryFileIO fileIO = getInMemoryIo(catalog); fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -766,8 +728,6 @@ public void testUpdateNotificationWhenTableAndNamespacesDontExist() { update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -810,8 +770,6 @@ public void testUpdateNotificationCreateTableInDisallowedLocation() { update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -859,7 +817,7 @@ public void testCreateNotificationCreateTableInExternalLocation() { .addPartitionSpec(PartitionSpec.unpartitioned()) .addSortOrder(SortOrder.unsorted()) .build(); - TableMetadataParser.write(tableMetadata, catalog.getIo().newOutputFile(tableMetadataLocation)); + TableMetadataParser.write(tableMetadata, fileIO.newOutputFile(tableMetadataLocation)); Namespace namespace = Namespace.of("parent", "child1"); TableIdentifier table = TableIdentifier.of(namespace, "my_table"); @@ -919,7 +877,7 @@ public void testCreateNotificationCreateTableOutsideOfMetadataLocation() { .addPartitionSpec(PartitionSpec.unpartitioned()) .addSortOrder(SortOrder.unsorted()) .build(); - TableMetadataParser.write(tableMetadata, catalog.getIo().newOutputFile(tableMetadataLocation)); + TableMetadataParser.write(tableMetadata, fileIO.newOutputFile(tableMetadataLocation)); Namespace namespace = Namespace.of("parent", "child1"); TableIdentifier table = TableIdentifier.of(namespace, "my_table"); @@ -968,7 +926,6 @@ public void testUpdateNotificationCreateTableInExternalLocation() { FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true") .build()); IcebergCatalog catalog = catalog(); - InMemoryFileIO fileIO = getInMemoryIo(catalog); fileIO.addFile( tableMetadataLocation, @@ -999,7 +956,7 @@ public void testUpdateNotificationCreateTableInExternalLocation() { .addPartitionSpec(PartitionSpec.unpartitioned()) .addSortOrder(SortOrder.unsorted()) .build(); - TableMetadataParser.write(tableMetadata, catalog.getIo().newOutputFile(maliciousMetadataFile)); + TableMetadataParser.write(tableMetadata, fileIO.newOutputFile(maliciousMetadataFile)); NotificationRequest updateRequest = new NotificationRequest(); updateRequest.setNotificationType(NotificationType.UPDATE); @@ -1042,7 +999,7 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() { callContext, entityManager, securityContext, catalogWithoutStorage); TaskExecutor taskExecutor = Mockito.mock(); IcebergCatalog catalog = - new IcebergFileIOCatalog( + new IcebergCatalog( entityManager, metaStoreManager, callContext, @@ -1068,8 +1025,6 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() { update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( metadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(metadataLocation)).getBytes(UTF_8)); @@ -1108,8 +1063,9 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() { new PolarisPassthroughResolutionView( callContext, entityManager, securityContext, catalogName); TaskExecutor taskExecutor = Mockito.mock(); + InMemoryFileIO localFileIO = new InMemoryFileIO(); IcebergCatalog catalog = - new IcebergFileIOCatalog( + new IcebergCatalog( entityManager, metaStoreManager, callContext, @@ -1126,8 +1082,6 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() { Namespace namespace = Namespace.of("parent", "child1"); TableIdentifier table = TableIdentifier.of(namespace, "table"); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - // The location of the metadata JSON file specified in the create will be forbidden. final String metadataLocation = "http://maliciousdomain.com/metadata.json"; NotificationRequest request = new NotificationRequest(); @@ -1204,8 +1158,6 @@ public void testUpdateNotificationWhenNamespacesExist() { update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -1256,8 +1208,6 @@ public void testUpdateNotificationWhenTableExists() { update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -1309,8 +1259,6 @@ public void testUpdateNotificationWhenTableExistsInDisallowedLocation() { update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -1347,8 +1295,6 @@ public void testUpdateNotificationRejectOutOfOrderTimestamp() { update.setTimestamp(timestamp); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -1420,8 +1366,6 @@ public void testUpdateNotificationWhenTableExistsFileSpecifiesDisallowedLocation update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - // Though the metadata JSON file itself is in an allowed location, make it internally specify // a forbidden table location. TableMetadata forbiddenMetadata = @@ -1498,8 +1442,6 @@ public void testDropNotificationWhenNamespacesExist() { update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -1550,8 +1492,6 @@ public void testDropNotificationWhenTableExists() { update.setTimestamp(230950845L); request.setPayload(update); - InMemoryFileIO fileIO = getInMemoryIo(catalog); - fileIO.addFile( tableMetadataLocation, TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); @@ -2069,8 +2009,4 @@ public void testEventsAreEmitted() { Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); } - - private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) { - return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo(); - } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index e39a4d0acb..6cd5847c9a 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -169,10 +169,10 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog private final SecurityContext securityContext; private final PolarisEventListener polarisEventListener; - protected String ioImplClassName; - protected FileIO catalogFileIO; - protected CloseableGroup closeableGroup; - protected Map tableDefaultProperties; + private String ioImplClassName; + private FileIO catalogFileIO; + private CloseableGroup closeableGroup; + private Map tableDefaultProperties; private final String catalogName; private long catalogId = -1; @@ -218,8 +218,8 @@ public String name() { } @VisibleForTesting - public FileIO getIo() { - return catalogFileIO; + public void setCatalogFileIo(FileIO fileIO) { + catalogFileIO = fileIO; } @Override @@ -339,7 +339,8 @@ public ViewBuilder buildView(TableIdentifier identifier) { @VisibleForTesting public TableOperations newTableOps( TableIdentifier tableIdentifier, boolean makeMetadataCurrentOnCommit) { - return new BasePolarisTableOperations(getIo(), tableIdentifier, makeMetadataCurrentOnCommit); + return new BasePolarisTableOperations( + catalogFileIO, tableIdentifier, makeMetadataCurrentOnCommit); } @Override @@ -844,7 +845,7 @@ private Page listViews(Namespace namespace, PageToken pageToken @VisibleForTesting @Override protected ViewOperations newViewOps(TableIdentifier identifier) { - return new BasePolarisViewOperations(getIo(), identifier); + return new BasePolarisViewOperations(catalogFileIO, identifier); } @Override