Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ public void dropTable(String catalog, TableIdentifier id) {
}
}

public int loadTable(String catalog, TableIdentifier id, String snapshots) {
String ns = RESTUtil.encodeNamespace(id.namespace());
try (Response res =
request(
"v1/{cat}/namespaces/" + ns + "/tables/{table}",
Map.of("cat", catalog, "table", id.name()),
Map.of("snapshots", snapshots))
.get()) {
return res.getStatus();
}
}

public List<TableIdentifier> listViews(String catalog, Namespace namespace) {
String ns = RESTUtil.encodeNamespace(namespace);
try (Response res =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.polaris.service.it.test;

import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.OK;
import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -64,6 +66,7 @@
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
import org.apache.polaris.core.admin.model.Catalog;
Expand Down Expand Up @@ -1343,4 +1346,69 @@ public void testDropNonExistingGenericTable() {

genericTableApi.purge(currentCatalogName, namespace);
}

@Test
public void testLoadTableWithSnapshots() {
Namespace namespace = Namespace.of("ns1");
restCatalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");
restCatalog.createTable(tableIdentifier, SCHEMA);

assertThat(catalogApi.loadTable(currentCatalogName, tableIdentifier, "ALL"))
.isEqualTo(OK.getStatusCode());
assertThat(catalogApi.loadTable(currentCatalogName, tableIdentifier, "all"))
.isEqualTo(OK.getStatusCode());
assertThat(catalogApi.loadTable(currentCatalogName, tableIdentifier, "refs"))
.isEqualTo(OK.getStatusCode());
assertThat(catalogApi.loadTable(currentCatalogName, tableIdentifier, "REFS"))
.isEqualTo(OK.getStatusCode());
assertThat(catalogApi.loadTable(currentCatalogName, tableIdentifier, "not-real"))
.isEqualTo(BAD_REQUEST.getStatusCode());

catalogApi.purge(currentCatalogName, namespace);
}

@Test
public void testLoadTableWithRefFiltering() {
Namespace namespace = Namespace.of("ns1");
restCatalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");

restCatalog.createTable(tableIdentifier, SCHEMA);

Table table = restCatalog.loadTable(tableIdentifier);

// Create an orphaned snapshot:
table.newAppend().appendFile(FILE_A).commit();
long snapshotIdA = table.currentSnapshot().snapshotId();
table.newAppend().appendFile(FILE_B).commit();
table.manageSnapshots().setCurrentSnapshot(snapshotIdA).commit();

String ns = RESTUtil.encodeNamespace(tableIdentifier.namespace());
try (Response res =
catalogApi
.request(
"v1/{cat}/namespaces/" + ns + "/tables/{table}",
Map.of("cat", currentCatalogName, "table", tableIdentifier.name()),
Map.of("snapshots", "all"))
.get()) {
LoadTableResponse responseContent = res.readEntity(LoadTableResponse.class);
assertThat(responseContent.tableMetadata().snapshots().size()).isEqualTo(2);
}

try (Response res =
catalogApi
.request(
"v1/{cat}/namespaces/" + ns + "/tables/{table}",
Map.of("cat", currentCatalogName, "table", tableIdentifier.name()),
Map.of("snapshots", "refs"))
.get()) {
LoadTableResponse responseContent = res.readEntity(LoadTableResponse.class);
assertThat(responseContent.tableMetadata().snapshots().size()).isEqualTo(1);
assertThat(responseContent.tableMetadata().snapshots().get(0).snapshotId())
.isEqualTo(snapshotIdA);
}

catalogApi.purge(currentCatalogName, namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import jakarta.annotation.Nonnull;
import jakarta.ws.rs.core.SecurityContext;
import java.io.Closeable;
import java.time.OffsetDateTime;
Expand All @@ -36,6 +37,7 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -127,6 +129,9 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab
protected SupportsNamespaces namespaceCatalog = null;
protected ViewCatalog viewCatalog = null;

public static final String SNAPSHOTS_ALL = "all";
public static final String SNAPSHOTS_REFS = "refs";

public IcebergCatalogHandler(
CallContext callContext,
PolarisEntityManager entityManager,
Expand Down Expand Up @@ -380,7 +385,8 @@ public LoadTableResponse createTableDirectWithWriteDelegation(
Set.of(
PolarisStorageActions.READ,
PolarisStorageActions.WRITE,
PolarisStorageActions.LIST))
PolarisStorageActions.LIST),
SNAPSHOTS_ALL)
.build();
} else if (table instanceof BaseMetadataTable) {
// metadata tables are loaded on the client side, return NoSuchTableException for now
Expand Down Expand Up @@ -471,7 +477,7 @@ public LoadTableResponse createTableStagedWithWriteDelegation(
TableMetadata metadata = stageTableCreateHelper(namespace, request);

return buildLoadTableResponseWithDelegationCredentials(
ident, metadata, Set.of(PolarisStorageActions.ALL))
ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL)
.build();
}

Expand Down Expand Up @@ -580,7 +586,18 @@ public Optional<LoadTableResponse> loadTableIfStale(
}
}

return Optional.of(CatalogHandlers.loadTable(baseCatalog, tableIdentifier));
Table table = baseCatalog.loadTable(tableIdentifier);
if (table instanceof BaseMetadataTable) {
throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString());
} else if (!(table instanceof BaseTable)) {
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
} else {
LoadTableResponse rawResponse =
LoadTableResponse.builder()
.withTableMetadata(((BaseTable) table).operations().current())
.build();
return Optional.of(filterResponseToSnapshots(rawResponse, snapshots));
}
}

public LoadTableResponse loadTableWithAccessDelegation(
Expand Down Expand Up @@ -679,7 +696,7 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
TableMetadata tableMetadata = baseTable.operations().current();
return Optional.of(
buildLoadTableResponseWithDelegationCredentials(
tableIdentifier, tableMetadata, actionsRequested)
tableIdentifier, tableMetadata, actionsRequested, snapshots)
.build());
} else if (table instanceof BaseMetadataTable) {
// metadata tables are loaded on the client side, return NoSuchTableException for now
Expand All @@ -692,7 +709,8 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredentials(
TableIdentifier tableIdentifier,
TableMetadata tableMetadata,
Set<PolarisStorageActions> actions) {
Set<PolarisStorageActions> actions,
String snapshots) {
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);
if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
Expand Down Expand Up @@ -1010,6 +1028,31 @@ public void renameView(RenameTableRequest request) {
CatalogHandlers.renameView(viewCatalog, request);
}

private @Nonnull LoadTableResponse filterResponseToSnapshots(
LoadTableResponse loadTableResponse, String snapshots) {
if (snapshots == null || snapshots.equalsIgnoreCase(SNAPSHOTS_ALL)) {
return loadTableResponse;
} else if (snapshots.equalsIgnoreCase(SNAPSHOTS_REFS)) {
TableMetadata metadata = loadTableResponse.tableMetadata();

Set<Long> referencedSnapshotIds =
metadata.refs().values().stream()
.map(SnapshotRef::snapshotId)
.collect(Collectors.toSet());

TableMetadata filteredMetadata =
metadata.removeSnapshotsIf(s -> !referencedSnapshotIds.contains(s.snapshotId()));

return LoadTableResponse.builder()
.withTableMetadata(filteredMetadata)
.addAllConfig(loadTableResponse.config())
.addAllCredentials(loadTableResponse.credentials())
.build();
} else {
throw new IllegalArgumentException("Unrecognized snapshots: " + snapshots);
}
}

@Override
public void close() throws Exception {
if (baseCatalog instanceof Closeable closeable) {
Expand Down