Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,15 @@ protected BehaviorChangeConfiguration(
.description("Whether or not to use soft values in the entity cache")
.defaultValue(false)
.buildBehaviorChangeConfiguration();

public static final BehaviorChangeConfiguration<Boolean>
TABLE_OPERATIONS_MAKE_METADATA_CURRENT_ON_COMMIT =
PolarisConfiguration.<Boolean>builder()
.key("TABLE_OPERATIONS_MAKE_METADATA_CURRENT_ON_COMMIT")
.description(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviving thread incorrectly marked as outdated!
#1456 (comment)

"If true, BasePolarisTableOperations should mark the metadata that is passed into"
+ " `commit` as current, and re-use it to skip a trip to object storage to re-construct"
+ " the committed metadata again.")
.defaultValue(true)
.buildBehaviorChangeConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.exception.RetryableException;
Expand Down Expand Up @@ -1693,8 +1695,8 @@ public void testFileIOWrapper() {

table.updateProperties().set("foo", "bar").commit();
Assertions.assertThat(measured.getInputBytes())
.as("A table was read and written")
.isGreaterThan(0);
.as("A table was read and written, but a trip to storage was made")
.isEqualTo(0);

Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should succeed").isTrue();
TaskEntity taskEntity =
Expand Down Expand Up @@ -1843,6 +1845,56 @@ public void testConcurrencyConflictUpdateTableDuringFinalTransaction() {
.hasMessageContaining("conflict_table");
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataOnCommit) {
Assumptions.assumeTrue(
requiresNamespaceCreate(),
Comment on lines +1851 to +1852
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean assertTrue? If it's not true under an "assumption" the test will simply be skipped, but I guess we'll want to revisit and fix the test if it ever happens 🤔

Copy link
Contributor Author

@eric-maynard eric-maynard Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Assumptions.assumeTrue is proliferated all through the existing suite, so if we want to update this we could also do them all at that time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

"Only applicable if namespaces must be created before adding children");

catalog.createNamespace(NS);
catalog.buildTable(TABLE, SCHEMA).create();

IcebergCatalog.BasePolarisTableOperations realOps =
(IcebergCatalog.BasePolarisTableOperations)
catalog.newTableOps(TABLE, updateMetadataOnCommit);
IcebergCatalog.BasePolarisTableOperations ops = Mockito.spy(realOps);

try (MockedStatic<TableMetadataParser> mocked =
Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) {
TableMetadata base1 = ops.current();
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));

TableMetadata base2 = ops.refresh();
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));

Assertions.assertThat(base1.metadataFileLocation()).isEqualTo(base2.metadataFileLocation());
Assertions.assertThat(base1).isEqualTo(base2);

Schema newSchema =
new Schema(Types.NestedField.optional(100, "new_col", Types.LongType.get()));
TableMetadata newMetadata =
TableMetadata.buildFrom(base1).setCurrentSchema(newSchema, 100).build();
ops.commit(base2, newMetadata);
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));

ops.current();
int expectedReads = updateMetadataOnCommit ? 1 : 2;
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()),
Mockito.times(expectedReads));
ops.refresh();
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()),
Mockito.times(expectedReads));
} finally {
catalog.dropTable(TABLE, true);
}
}

private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) {
return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.azure.core.exception.AzureException;
Expand Down Expand Up @@ -117,6 +118,30 @@ private static void requestCreateTable() {
res.close();
}

private static void requestDropTable() {
Response res =
services
.restApi()
.dropTable(
catalog, "ns1", "t1", false, services.realmContext(), services.securityContext());
res.close();
}

private static void requestLoadTable() {
Response res =
services
.restApi()
.loadTable(
catalog,
"ns1",
"t1",
null,
"ALL",
services.realmContext(),
services.securityContext());
res.close();
}

static Stream<RuntimeException> exceptions() {
return Stream.of(
new AzureException("Forbidden"),
Expand All @@ -135,7 +160,9 @@ void testLoadFileIOExceptionPropagation(RuntimeException ex) {
@MethodSource("exceptions")
void testNewInputFileExceptionPropagation(RuntimeException ex) {
ioFactory.newInputFileExceptionSupplier = Optional.of(() -> ex);
assertThatThrownBy(FileIOExceptionsTest::requestCreateTable).isSameAs(ex);
assertThatCode(FileIOExceptionsTest::requestCreateTable).doesNotThrowAnyException();
assertThatThrownBy(FileIOExceptionsTest::requestLoadTable).isSameAs(ex);
assertThatCode(FileIOExceptionsTest::requestDropTable).doesNotThrowAnyException();
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,22 @@ public ViewBuilder buildView(TableIdentifier identifier) {
return new PolarisIcebergCatalogViewBuilder(identifier);
}

@VisibleForTesting
public TableOperations newTableOps(
TableIdentifier tableIdentifier, boolean makeMetadataCurrentOnCommit) {
return new BasePolarisTableOperations(
catalogFileIO, tableIdentifier, makeMetadataCurrentOnCommit);
}

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new BasePolarisTableOperations(catalogFileIO, tableIdentifier);
boolean makeMetadataCurrentOnCommit =
getCurrentPolarisContext()
.getConfigurationStore()
.getConfiguration(
getCurrentPolarisContext(),
BehaviorChangeConfiguration.TABLE_OPERATIONS_MAKE_METADATA_CURRENT_ON_COMMIT);
return newTableOps(tableIdentifier, makeMetadataCurrentOnCommit);
}

@Override
Expand Down Expand Up @@ -1207,17 +1220,24 @@ public ViewBuilder withLocation(String newLocation) {
* org.apache.iceberg.BaseMetastoreTableOperations}. CODE_COPIED_TO_POLARIS From Apache Iceberg
* Version: 1.8
*/
private class BasePolarisTableOperations extends PolarisOperationsBase<TableMetadata>
@VisibleForTesting
public class BasePolarisTableOperations extends PolarisOperationsBase<TableMetadata>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we need to make this public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added @VisibleForTesting for clarity

implements TableOperations {
private final TableIdentifier tableIdentifier;
private final String fullTableName;
private final boolean makeMetadataCurrentOnCommit;

private FileIO tableFileIO;

BasePolarisTableOperations(FileIO defaultFileIO, TableIdentifier tableIdentifier) {
BasePolarisTableOperations(
FileIO defaultFileIO,
TableIdentifier tableIdentifier,
boolean makeMetadataCurrentOnCommit) {
LOGGER.debug("new BasePolarisTableOperations for {}", tableIdentifier);
this.tableIdentifier = tableIdentifier;
this.fullTableName = fullTableName(catalogName, tableIdentifier);
this.tableFileIO = defaultFileIO;
this.makeMetadataCurrentOnCommit = makeMetadataCurrentOnCommit;
}

@Override
Expand Down Expand Up @@ -1476,6 +1496,17 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
+ "because it has been concurrently modified to %s",
tableIdentifier, oldLocation, newLocation, existingLocation);
}

// We diverge from `BaseMetastoreTableOperations` in the below code block
if (makeMetadataCurrentOnCommit) {
currentMetadata =
TableMetadata.buildFrom(metadata)
.withMetadataLocation(newLocation)
.discardChanges()
.build();
currentMetadataLocation = newLocation;
Comment on lines +1502 to +1507
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great

}

if (null == existingLocation) {
createTableLike(tableIdentifier, entity);
} else {
Expand Down