From f5fd9ae5d984e03bddeb3965e286e72f82050527 Mon Sep 17 00:00:00 2001 From: Gabor Kaszab Date: Mon, 20 Oct 2025 15:45:05 +0200 Subject: [PATCH 1/2] Core: Freshness-aware table loading in REST catalog This is the client-side improvement for the freshness-aware table loading in REST catalog. The main design is the following: - REST server can send an ETag with the LoadTableResponse - The client can use this ETag to populate the IF_NONE_MATCH header with the next loadTable request - The server can send a 304-NOT_MODIFIED response without a body if the table has not been changed based on the ETag - The client when receives a 304, then returns the latest table object associated with the ETag from cache --- .../org/apache/iceberg/rest/HTTPClient.java | 10 + .../org/apache/iceberg/rest/RESTCatalog.java | 4 + .../iceberg/rest/RESTCatalogProperties.java | 11 + .../iceberg/rest/RESTSessionCatalog.java | 168 ++++- .../apache/iceberg/rest/RESTTableCache.java | 132 ++++ .../apache/iceberg/rest/TestRESTCatalog.java | 659 +++++++++++++++++- .../iceberg/rest/TestRESTScanPlanning.java | 20 + .../iceberg/rest/TestableRESTCatalog.java | 52 ++ .../rest/TestableRESTSessionCatalog.java | 44 ++ 9 files changed, 1031 insertions(+), 69 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java create mode 100644 core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java create mode 100644 core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java index b30caf1c7db0..2fd3ec882a66 100644 --- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -326,6 +326,16 @@ protected T execute( // Skip parsing the response stream for any successful request not expecting a response body if (emptyBody(response, responseType)) { + if (response.getCode() == HttpStatus.SC_NOT_MODIFIED + && !req.headers().contains(HttpHeaders.IF_NONE_MATCH)) { + // 304-NOT_MODIFIED is used for freshness-aware loading and requires an ETag sent to the + // server via IF_NONE_MATCH header in the request. If no ETag was sent, we shouldn't + // receive a 304. + throw new RESTException( + "Invalid (NOT_MODIFIED) response for request: method=%s, path=%s", + req.method(), req.path()); + } + return null; } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index aff8832c6bf4..f4c75d1050d7 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -96,6 +96,10 @@ public void initialize(String name, Map props) { sessionCatalog.initialize(name, props); } + protected RESTSessionCatalog sessionCatalog() { + return sessionCatalog; + } + @Override public String name() { return sessionCatalog.name(); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java index 72b09fa772c5..e294bcfebe46 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.rest; +import java.util.concurrent.TimeUnit; + public final class RESTCatalogProperties { private RESTCatalogProperties() {} @@ -43,6 +45,15 @@ private RESTCatalogProperties() {} public static final String REST_SCAN_PLAN_ID = "rest-scan-plan-id"; + // Properties that control the behaviour of the table cache used for freshness-aware table + // loading. + public static final String TABLE_CACHE_EXPIRE_AFTER_WRITE_MS = + "rest-table-cache.expire-after-write-ms"; + public static final long TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5); + + public static final String TABLE_CACHE_MAX_ENTRIES = "rest-table-cache.max-entries"; + public static final int TABLE_CACHE_MAX_ENTRIES_DEFAULT = 100; + public enum SnapshotMode { ALL, REFS diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 4ff13ac82417..995a760ad273 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -26,9 +26,11 @@ import java.util.Map; import java.util.Set; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.hc.core5.http.HttpHeaders; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -60,6 +62,7 @@ import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.metrics.MetricsReporters; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -67,6 +70,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; +import org.apache.iceberg.rest.RESTTableCache.TableSupplierWithETag; import org.apache.iceberg.rest.auth.AuthManager; import org.apache.iceberg.rest.auth.AuthManagers; import org.apache.iceberg.rest.auth.AuthSession; @@ -165,6 +169,8 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private Supplier> mutationHeaders = Map::of; private String namespaceSeparator = null; + private RESTTableCache tableCache; + public RESTSessionCatalog() { this( config -> @@ -277,9 +283,22 @@ public void initialize(String name, Map unresolved) { mergedProps, RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT); + + this.tableCache = createTableCache(mergedProps); + this.closeables.addCloseable(this.tableCache); + super.initialize(name, mergedProps); } + protected RESTTableCache createTableCache(Map props) { + return new RESTTableCache(props); + } + + @VisibleForTesting + RESTTableCache tableCache() { + return tableCache; + } + @Override public void setConf(Object newConf) { this.conf = newConf; @@ -332,6 +351,8 @@ public boolean dropTable(SessionContext context, TableIdentifier identifier) { return true; } catch (NoSuchTableException e) { return false; + } finally { + invalidateTable(context, identifier); } } @@ -353,6 +374,8 @@ public boolean purgeTable(SessionContext context, TableIdentifier identifier) { return true; } catch (NoSuchTableException e) { return false; + } finally { + invalidateTable(context, identifier); } } @@ -370,6 +393,8 @@ public void renameTable(SessionContext context, TableIdentifier from, TableIdent client .withAuthSession(contextualSession) .post(paths.rename(), request, null, mutationHeaders, ErrorHandlers.tableErrorHandler()); + + invalidateTable(context, from); } @Override @@ -384,9 +409,15 @@ public boolean tableExists(SessionContext context, TableIdentifier identifier) { return true; } else { // fallback in order to work with 1.7.x and older servers - return super.tableExists(context, identifier); + if (!super.tableExists(context, identifier)) { + invalidateTable(context, identifier); + return false; + } + + return true; } } catch (NoSuchTableException e) { + invalidateTable(context, identifier); return false; } } @@ -396,7 +427,11 @@ private static Map snapshotModeToParam(SnapshotMode mode) { } private LoadTableResponse loadInternal( - SessionContext context, TableIdentifier identifier, SnapshotMode mode) { + SessionContext context, + TableIdentifier identifier, + SnapshotMode mode, + Map headers, + Consumer> responseHeaders) { Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE); AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); return client @@ -405,8 +440,9 @@ private LoadTableResponse loadInternal( paths.table(identifier), snapshotModeToParam(mode), LoadTableResponse.class, - Map.of(), - ErrorHandlers.tableErrorHandler()); + headers, + ErrorHandlers.tableErrorHandler(), + responseHeaders); } @Override @@ -424,8 +460,25 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { MetadataTableType metadataType; LoadTableResponse response; TableIdentifier loadedIdent; + + Map responseHeaders = Maps.newHashMap(); + TableSupplierWithETag cachedTable = tableCache.getIfPresent(context.sessionId(), identifier); + try { - response = loadInternal(context, identifier, snapshotMode); + response = + loadInternal( + context, + identifier, + snapshotMode, + headersForLoadTable(cachedTable), + responseHeaders::putAll); + + if (response == null) { + Preconditions.checkNotNull(cachedTable, "Invalid load table response: null"); + + return cachedTable.tableSupplier().get(); + } + loadedIdent = identifier; metadataType = null; @@ -435,14 +488,33 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { // attempt to load a metadata table using the identifier's namespace as the base table TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels()); try { - response = loadInternal(context, baseIdent, snapshotMode); + responseHeaders.clear(); + cachedTable = tableCache.getIfPresent(context.sessionId(), baseIdent); + + response = + loadInternal( + context, + baseIdent, + snapshotMode, + headersForLoadTable(cachedTable), + responseHeaders::putAll); + + if (response == null) { + Preconditions.checkNotNull(cachedTable, "Invalid load table response: null"); + + return MetadataTableUtils.createMetadataTableInstance( + cachedTable.tableSupplier().get(), metadataType); + } + loadedIdent = baseIdent; } catch (NoSuchTableException ignored) { // the base table does not exist + invalidateTable(context, baseIdent); throw original; } } else { // name is not a metadata table + invalidateTable(context, identifier); throw original; } } @@ -461,7 +533,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { .setPreviousFileLocation(null) .setSnapshotsSupplier( () -> - loadInternal(context, finalIdentifier, SnapshotMode.ALL) + loadInternal(context, finalIdentifier, SnapshotMode.ALL, Map.of(), h -> {}) .tableMetadata() .snapshots()) .discardChanges() @@ -470,38 +542,52 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { tableMetadata = response.tableMetadata(); } + List credentials = response.credentials(); RESTClient tableClient = client.withAuthSession(tableSession); - RESTTableOperations ops = - newTableOps( - tableClient, - paths.table(finalIdentifier), - Map::of, - mutationHeaders, - tableFileIO(context, tableConf, response.credentials()), - tableMetadata, - endpoints); + Supplier tableSupplier = + createTableSupplier( + finalIdentifier, tableMetadata, context, tableClient, tableConf, credentials); - trackFileIO(ops); - - // RestTable should only be returned for non-metadata tables, because client would - // not have access to metadata files for example manifests, since all it needs is catalog. - if (metadataType == null) { - RESTTable restTable = restTableForScanPlanning(ops, finalIdentifier, tableClient); - if (restTable != null) { - return restTable; - } + String eTag = responseHeaders.getOrDefault(HttpHeaders.ETAG, null); + if (eTag != null) { + tableCache.put(context.sessionId(), finalIdentifier, tableSupplier, eTag); } - BaseTable table = - new BaseTable( - ops, - fullTableName(finalIdentifier), - metricsReporter(paths.metrics(finalIdentifier), tableClient)); if (metadataType != null) { - return MetadataTableUtils.createMetadataTableInstance(table, metadataType); + return MetadataTableUtils.createMetadataTableInstance(tableSupplier.get(), metadataType); } - return table; + return tableSupplier.get(); + } + + private Supplier createTableSupplier( + TableIdentifier identifier, + TableMetadata tableMetadata, + SessionContext context, + RESTClient tableClient, + Map tableConf, + List credentials) { + return () -> { + RESTTableOperations ops = + newTableOps( + tableClient, + paths.table(identifier), + Map::of, + mutationHeaders, + tableFileIO(context, tableConf, credentials), + tableMetadata, + endpoints); + + trackFileIO(ops); + + RESTTable table = restTableForScanPlanning(ops, identifier, tableClient); + if (table != null) { + return table; + } + + return new BaseTable( + ops, fullTableName(identifier), metricsReporter(paths.metrics(identifier), tableClient)); + }; } private RESTTable restTableForScanPlanning( @@ -546,7 +632,9 @@ public Catalog.TableBuilder buildTable( } @Override - public void invalidateTable(SessionContext context, TableIdentifier ident) {} + public void invalidateTable(SessionContext context, TableIdentifier ident) { + tableCache.invalidate(context.sessionId(), ident); + } @Override public Table registerTable( @@ -906,7 +994,8 @@ public Transaction replaceTransaction() { throw new AlreadyExistsException("View with same name already exists: %s", ident); } - LoadTableResponse response = loadInternal(context, ident, snapshotMode); + LoadTableResponse response = loadInternal(context, ident, snapshotMode, Map.of(), h -> {}); + String fullName = fullTableName(ident); Map tableConf = response.config(); @@ -1368,6 +1457,17 @@ public void renameView(SessionContext context, TableIdentifier from, TableIdenti .post(paths.renameView(), request, null, mutationHeaders, ErrorHandlers.viewErrorHandler()); } + private static Map headersForLoadTable(TableSupplierWithETag tableWithETag) { + if (tableWithETag == null) { + return Map.of(); + } + + String eTag = tableWithETag.eTag(); + Preconditions.checkArgument(eTag != null, "Invalid ETag: null"); + + return Map.of(HttpHeaders.IF_NONE_MATCH, eTag); + } + private class RESTViewBuilder implements ViewBuilder { private final SessionContext context; private final TableIdentifier identifier; diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java new file mode 100644 index 000000000000..6f2dd7ccfa8f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Ticker; +import java.io.Closeable; +import java.time.Duration; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; +import org.immutables.value.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class RESTTableCache implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RESTTableCache.class); + + @Value.Immutable + interface SessionIdTableId { + String sessionId(); + + TableIdentifier tableIdentifier(); + + static SessionIdTableId of(String sessionId, TableIdentifier ident) { + return ImmutableSessionIdTableId.builder() + .sessionId(sessionId) + .tableIdentifier(ident) + .build(); + } + } + + @Value.Immutable + interface TableSupplierWithETag { + Supplier tableSupplier(); + + String eTag(); + + static TableSupplierWithETag of(Supplier tableSupplier, String eTag) { + return ImmutableTableSupplierWithETag.builder() + .tableSupplier(tableSupplier) + .eTag(eTag) + .build(); + } + } + + private final Cache tableCache; + + RESTTableCache(Map props) { + this(props, Ticker.systemTicker()); + } + + @VisibleForTesting + RESTTableCache(Map props, Ticker ticker) { + long expireAfterWriteMS = + PropertyUtil.propertyAsLong( + props, + RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, + RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT); + Preconditions.checkArgument( + expireAfterWriteMS > 0, "Invalid expire after write: zero or negative"); + + long numEntries = + PropertyUtil.propertyAsLong( + props, + RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, + RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES_DEFAULT); + Preconditions.checkArgument(numEntries >= 0, "Invalid max entries: negative"); + + tableCache = + Caffeine.newBuilder() + .maximumSize(numEntries) + .expireAfterWrite(Duration.ofMillis(expireAfterWriteMS)) + .removalListener( + (compositeKey, table, cause) -> + LOG.debug("Evicted {} from table cache ({})", compositeKey, cause)) + .recordStats() + .ticker(ticker) + .build(); + } + + public TableSupplierWithETag getIfPresent(String sessionId, TableIdentifier identifier) { + SessionIdTableId cacheKey = SessionIdTableId.of(sessionId, identifier); + return tableCache.getIfPresent(cacheKey); + } + + public void put( + String sessionId, + TableIdentifier identifier, + Supplier tableSupplier, + String eTag) { + tableCache.put( + SessionIdTableId.of(sessionId, identifier), TableSupplierWithETag.of(tableSupplier, eTag)); + } + + public void invalidate(String sessionId, TableIdentifier identifier) { + SessionIdTableId cacheKey = SessionIdTableId.of(sessionId, identifier); + tableCache.invalidate(cacheKey); + } + + @VisibleForTesting + Cache tableCache() { + return tableCache; + } + + @Override + public void close() { + tableCache.invalidateAll(); + tableCache.cleanUp(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 40dc050311fe..dcb615e84682 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -33,11 +33,13 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.benmanes.caffeine.cache.Cache; import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.file.Path; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -55,6 +57,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.http.HttpHeaders; +import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.CatalogProperties; @@ -79,6 +82,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.RESTException; @@ -93,6 +97,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; +import org.apache.iceberg.rest.RESTTableCache.SessionIdTableId; +import org.apache.iceberg.rest.RESTTableCache.TableSupplierWithETag; import org.apache.iceberg.rest.auth.AuthManager; import org.apache.iceberg.rest.auth.AuthManagers; import org.apache.iceberg.rest.auth.AuthSession; @@ -109,6 +115,7 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.FakeTicker; import org.apache.iceberg.util.Pair; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; @@ -133,6 +140,16 @@ public class TestRESTCatalog extends CatalogTests { ImmutableMap.of( RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8)); + private static final Duration TABLE_EXPIRATION = + Duration.ofMillis(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT); + private static final Duration HALF_OF_TABLE_EXPIRATION = TABLE_EXPIRATION.dividedBy(2); + + private static final SessionCatalog.SessionContext DEFAULT_SESSION_CONTEXT = + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), + "user", + ImmutableMap.of("credential", "user:12345"), + ImmutableMap.of()); private static final class IdempotentEnv { private final TableIdentifier ident; @@ -306,16 +323,10 @@ public void createCatalog() throws Exception { @Override protected RESTCatalog initCatalog(String catalogName, Map additionalProperties) { Configuration conf = new Configuration(); - SessionCatalog.SessionContext context = - new SessionCatalog.SessionContext( - UUID.randomUUID().toString(), - "user", - ImmutableMap.of("credential", "user:12345"), - ImmutableMap.of()); RESTCatalog catalog = new RESTCatalog( - context, + DEFAULT_SESSION_CONTEXT, (config) -> HTTPClient.builder(config) .uri(config.get(CatalogProperties.URI)) @@ -3009,12 +3020,13 @@ public void testETagWithRegisterTable() { assertThat(respHeaders).containsEntry(HttpHeaders.ETAG, eTag); } - @SuppressWarnings("checkstyle:AssertThatThrownByWithMessageCheck") @Test public void testNotModified() { catalog().createNamespace(TABLE.namespace()); - Table table = catalog().createTable(TABLE, SCHEMA); + catalog().createTable(TABLE, SCHEMA); + + Table table = catalog().loadTable(TABLE); String eTag = ETagProvider.of(((BaseTable) table).operations().current().metadataFileLocation()); @@ -3023,26 +3035,21 @@ public void testNotModified() { invocation -> { HTTPRequest originalRequest = invocation.getArgument(0); - HTTPHeaders extendedHeaders = - ImmutableHTTPHeaders.copyOf(originalRequest.headers()) - .putIfAbsent( - ImmutableHTTPHeader.builder() - .name(HttpHeaders.IF_NONE_MATCH) - .value(eTag) - .build()); - - ImmutableHTTPRequest extendedRequest = - ImmutableHTTPRequest.builder() - .from(originalRequest) - .headers(extendedHeaders) - .build(); - - return adapterForRESTServer.execute( - extendedRequest, - LoadTableResponse.class, - invocation.getArgument(2), - invocation.getArgument(3), - ParserContext.builder().build()); + assertThat(originalRequest.headers().contains(HttpHeaders.IF_NONE_MATCH)); + assertThat( + originalRequest.headers().firstEntry(HttpHeaders.IF_NONE_MATCH).get().value()) + .isEqualTo(eTag); + + assertThat( + adapterForRESTServer.execute( + originalRequest, + LoadTableResponse.class, + invocation.getArgument(2), + invocation.getArgument(3), + ParserContext.builder().build())) + .isNull(); + + return null; }) .when(adapterForRESTServer) .execute( @@ -3051,17 +3058,14 @@ public void testNotModified() { any(), any()); - // TODO: This won't throw when client side of freshness-aware loading is implemented - assertThatThrownBy(() -> catalog().loadTable(TABLE)).isInstanceOf(NullPointerException.class); + catalog().loadTable(TABLE); TableIdentifier metadataTableIdentifier = TableIdentifier.of(NS.toString(), TABLE.name(), "partitions"); - // TODO: This won't throw when client side of freshness-aware loading is implemented - assertThatThrownBy(() -> catalog().loadTable(metadataTableIdentifier)) - .isInstanceOf(NullPointerException.class); + catalog().loadTable(metadataTableIdentifier); - Mockito.verify(adapterForRESTServer, times(2)) + Mockito.verify(adapterForRESTServer, times(3)) .execute( reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), eq(LoadTableResponse.class), @@ -3229,6 +3233,7 @@ public void testCommitStateUnknownNotReconciled() { .satisfies(ex -> assertThat(((CommitStateUnknownException) ex).getSuppressed()).isEmpty()); } + @SuppressWarnings("MethodLength") @Test public void testCustomTableOperationsInjection() throws IOException { AtomicBoolean customTableOpsCalled = new AtomicBoolean(); @@ -3378,6 +3383,60 @@ protected RESTTableOperations newTableOps( } } + @Test + public void testCustomTableOperationsWithFreshnessAwareLoading() { + class CustomTableOps extends RESTTableOperations { + CustomTableOps( + RESTClient client, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + FileIO io, + TableMetadata current, + Set endpoints) { + super(client, path, readHeaders, mutationHeaders, io, current, endpoints); + } + } + + class CustomRESTSessionCatalog extends RESTSessionCatalog { + CustomRESTSessionCatalog( + Function, RESTClient> clientBuilder, + BiFunction, FileIO> ioBuilder) { + super(clientBuilder, ioBuilder); + } + + @Override + protected RESTTableOperations newTableOps( + RESTClient restClient, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + FileIO fileIO, + TableMetadata current, + Set supportedEndpoints) { + return new CustomTableOps( + restClient, path, readHeaders, mutationHeaders, fileIO, current, supportedEndpoints); + } + } + + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + RESTCatalog catalog = + catalog(adapter, clientBuilder -> new CustomRESTSessionCatalog(clientBuilder, null)); + + catalog.createNamespace(NS); + + catalog.createTable(TABLE, SCHEMA); + + expectFullTableLoadForLoadTable(TABLE, adapter); + BaseTable table = (BaseTable) catalog.loadTable(TABLE); + assertThat(table.operations()).isInstanceOf(CustomTableOps.class); + + // When answering loadTable from table cache we still get the injected ops. + expectNotModifiedResponseForLoadTable(TABLE, adapter); + table = (BaseTable) catalog.loadTable(TABLE); + assertThat(table.operations()).isInstanceOf(CustomTableOps.class); + } + @Test public void testClientAutoSendsIdempotencyWhenServerAdvertises() { ConfigResponse cfgWithIdem = @@ -3778,6 +3837,497 @@ public void testLoadTableWithMissingMetadataFile(@TempDir Path tempDir) { .hasMessageContaining("No in-memory file found for location: " + metadataFileLocation); } + @Test + public void testInvalidTableCacheParameters() { + RESTCatalog catalog = new RESTCatalog(config -> new RESTCatalogAdapter(backendCatalog)); + + assertThatThrownBy( + () -> + catalog.initialize( + "test", Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid expire after write: zero or negative"); + + assertThatThrownBy( + () -> + catalog.initialize( + "test", Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid expire after write: zero or negative"); + + assertThatThrownBy( + () -> + catalog.initialize( + "test", Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid max entries: negative"); + } + + @Test + public void testFreshnessAwareLoading() { + catalog().createNamespace(TABLE.namespace()); + + catalog().createTable(TABLE, SCHEMA); + + Cache tableCache = + restCatalog.sessionCatalog().tableCache().tableCache(); + assertThat(tableCache.estimatedSize()).isZero(); + + expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer); + + BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE); + + assertThat(tableCache.stats().hitCount()).isZero(); + assertThat(tableCache.asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer); + + BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE); + + assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad); + assertThat(tableAfterFirstLoad.operations().current().location()) + .isEqualTo(tableAfterSecondLoad.operations().current().location()); + assertThat( + tableCache + .asMap() + .get(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)) + .tableSupplier() + .get() + .operations() + .current() + .metadataFileLocation()) + .isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation()); + + Mockito.verify(adapterForRESTServer, times(2)) + .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); + } + + @Test + public void testFreshnessAwareLoadingMetadataTables() { + catalog().createNamespace(TABLE.namespace()); + + catalog().createTable(TABLE, SCHEMA); + + Cache tableCache = + restCatalog.sessionCatalog().tableCache().tableCache(); + assertThat(tableCache.estimatedSize()).isZero(); + + BaseTable table = (BaseTable) catalog().loadTable(TABLE); + + assertThat(tableCache.stats().hitCount()).isZero(); + assertThat(tableCache.asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + TableIdentifier metadataTableIdentifier = + TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), "partitions"); + + BaseMetadataTable metadataTable = + (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier); + + assertThat(tableCache.stats().hitCount()).isEqualTo(1); + assertThat(tableCache.asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + assertThat(table).isNotEqualTo(metadataTable.table()); + assertThat(table.operations().current().metadataFileLocation()) + .isEqualTo(metadataTable.table().operations().current().metadataFileLocation()); + + ResourcePaths paths = + ResourcePaths.forCatalogProperties( + ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E")); + + Mockito.verify(adapterForRESTServer, times(2)) + .execute(reqMatcher(HTTPMethod.GET, paths.table(TABLE)), any(), any(), any()); + + Mockito.verify(adapterForRESTServer) + .execute( + reqMatcher(HTTPMethod.GET, paths.table(metadataTableIdentifier)), any(), any(), any()); + } + + @Test + public void testRenameTableInvalidatesTable() { + runTableInvalidationTest( + restCatalog, + adapterForRESTServer, + (catalog) -> + catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), "other_table")), + 0); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDropTableInvalidatesTable(boolean purge) { + runTableInvalidationTest( + restCatalog, adapterForRESTServer, (catalog) -> catalog.dropTable(TABLE, purge), 0); + } + + @Test + public void testTableExistViaHeadRequestInvalidatesTable() { + runTableInvalidationTest( + restCatalog, + adapterForRESTServer, + ((catalog) -> { + // Use a different catalog to drop the table + catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); + + // The main catalog still has the table in cache + assertThat(catalog.sessionCatalog().tableCache().tableCache().asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + catalog.tableExists(TABLE); + }), + 0); + } + + @Test + public void testTableExistViaGetRequestInvalidatesTable() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + // Configure REST server to answer tableExists query via GET + Mockito.doAnswer( + invocation -> + ConfigResponse.builder() + .withEndpoints( + ImmutableList.of( + Endpoint.V1_LOAD_TABLE, + Endpoint.V1_CREATE_NAMESPACE, + Endpoint.V1_CREATE_TABLE)) + .build()) + .when(adapter) + .execute( + reqMatcher(HTTPMethod.GET, ResourcePaths.config()), + eq(ConfigResponse.class), + any(), + any()); + + RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); + catalog.initialize( + "catalog", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + + runTableInvalidationTest( + catalog, + adapter, + (cat) -> { + // Use a different catalog to drop the table + catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); + + // The main catalog still has the table in cache + assertThat(cat.sessionCatalog().tableCache().tableCache().asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + cat.tableExists(TABLE); + }, + 1); + } + + @Test + public void testLoadTableInvalidatesCache() { + runTableInvalidationTest( + restCatalog, + adapterForRESTServer, + (catalog) -> { + // Use a different catalog to drop the table + catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); + + // The main catalog still has the table in cache + assertThat(catalog.sessionCatalog().tableCache().tableCache().asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + assertThatThrownBy(() -> catalog.loadTable(TABLE)) + .isInstanceOf(NoSuchTableException.class) + .hasMessage("Table does not exist: %s", TABLE); + }, + 1); + } + + @Test + public void testLoadTableWithMetadataTableNameInvalidatesCache() { + TableIdentifier metadataTableIdentifier = + TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), "partitions"); + + runTableInvalidationTest( + restCatalog, + adapterForRESTServer, + (catalog) -> { + // Use a different catalog to drop the table + catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); + + // The main catalog still has the table in cache + assertThat(catalog.sessionCatalog().tableCache().tableCache().asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier)) + .isInstanceOf(NoSuchTableException.class) + .hasMessage("Table does not exist: %s", TABLE); + }, + 1); + + ResourcePaths paths = + ResourcePaths.forCatalogProperties( + ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E")); + + Mockito.verify(adapterForRESTServer) + .execute( + reqMatcher(HTTPMethod.GET, paths.table(metadataTableIdentifier)), any(), any(), any()); + } + + private void runTableInvalidationTest( + RESTCatalog catalog, + RESTCatalogAdapter adapterToVerify, + Consumer action, + int loadTableCountFromAction) { + catalog.createNamespace(TABLE.namespace()); + + catalog.createTable(TABLE, SCHEMA); + + BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE); + + Cache tableCache = + catalog.sessionCatalog().tableCache().tableCache(); + assertThat(tableCache.stats().hitCount()).isZero(); + assertThat(tableCache.asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + action.accept(catalog); + + // Check that 'action' invalidates cache + assertThat(tableCache.estimatedSize()).isZero(); + + assertThatThrownBy(() -> catalog.loadTable(TABLE)) + .isInstanceOf(NoSuchTableException.class) + .hasMessageContaining("Table does not exist: %s", TABLE); + + catalog.createTable(TABLE, SCHEMA); + + expectFullTableLoadForLoadTable(TABLE, adapterToVerify); + + BaseTable newTableWithSameName = (BaseTable) catalog.loadTable(TABLE); + + assertThat(tableCache.stats().hitCount()).isEqualTo(loadTableCountFromAction); + assertThat(tableCache.asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + assertThat(newTableWithSameName).isNotEqualTo(originalTable); + assertThat(newTableWithSameName.operations().current().metadataFileLocation()) + .isNotEqualTo(originalTable.operations().current().metadataFileLocation()); + + Mockito.verify(adapterToVerify, times(3 + loadTableCountFromAction)) + .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); + } + + @Test + public void testTableCacheWithMultiSessions() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + RESTSessionCatalog sessionCatalog = new RESTSessionCatalog(config -> adapter, null); + sessionCatalog.initialize("test_session_catalog", Map.of()); + + SessionCatalog.SessionContext otherSessionContext = + new SessionCatalog.SessionContext( + "session_id_2", "user", ImmutableMap.of("credential", "user:12345"), ImmutableMap.of()); + + sessionCatalog.createNamespace(DEFAULT_SESSION_CONTEXT, TABLE.namespace()); + + sessionCatalog.buildTable(DEFAULT_SESSION_CONTEXT, TABLE, SCHEMA).create(); + + expectFullTableLoadForLoadTable(TABLE, adapter); + + sessionCatalog.loadTable(DEFAULT_SESSION_CONTEXT, TABLE); + + Cache tableCache = + sessionCatalog.tableCache().tableCache(); + assertThat(tableCache.stats().hitCount()).isZero(); + assertThat(tableCache.asMap()) + .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + expectFullTableLoadForLoadTable(TABLE, adapter); + + sessionCatalog.loadTable(otherSessionContext, TABLE); + + assertThat(tableCache.asMap()) + .containsOnlyKeys( + SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE), + SessionIdTableId.of(otherSessionContext.sessionId(), TABLE)); + } + + @Test + public void test304NotModifiedResponseWithEmptyTableCache() { + Mockito.doAnswer(invocation -> null) + .when(adapterForRESTServer) + .execute( + reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), + eq(LoadTableResponse.class), + any(), + any()); + + catalog().createNamespace(TABLE.namespace()); + + catalog().createTable(TABLE, SCHEMA); + + catalog().invalidateTable(TABLE); + + // Table is not in the cache and null LoadTableResponse is received + assertThatThrownBy(() -> catalog().loadTable(TABLE)) + .isInstanceOf(RESTException.class) + .hasMessage( + "Invalid (NOT_MODIFIED) response for request: method=%s, path=%s", + HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)); + } + + @Test + public void testTableCacheNotUpdatedWithoutETag() { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + // Wrap the original responseHeaders to not accept ETag. + Consumer> noETagConsumer = + headers -> { + if (!headers.containsKey(HttpHeaders.ETAG)) { + responseHeaders.accept(headers); + } + }; + return super.execute(request, responseType, errorHandler, noETagConsumer); + } + }); + + RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); + catalog.initialize( + "catalog", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + + catalog.createNamespace(TABLE.namespace()); + + catalog.createTable(TABLE, SCHEMA); + + catalog.loadTable(TABLE); + + assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isZero(); + } + + @Test + public void testTableCacheIsDisabled() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); + catalog.initialize( + "catalog", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO", + RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, + "0")); + + catalog.createNamespace(TABLE.namespace()); + + catalog.createTable(TABLE, SCHEMA); + + assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isZero(); + + expectFullTableLoadForLoadTable(TABLE, adapter); + + catalog.loadTable(TABLE); + + catalog.sessionCatalog().tableCache().tableCache().cleanUp(); + + assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isZero(); + } + + @Test + public void testFullTableLoadAfterExpiryFromCache() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + FakeTicker ticker = new FakeTicker(); + + TestableRESTCatalog catalog = + new TestableRESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter, ticker); + catalog.initialize("catalog", Map.of()); + + catalog.createNamespace(TABLE.namespace()); + + catalog.createTable(TABLE, SCHEMA); + + catalog.loadTable(TABLE); + + Cache tableCache = + catalog.sessionCatalog().tableCache().tableCache(); + SessionIdTableId tableCacheKey = + SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE); + + assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(Duration.ZERO); + + ticker.advance(HALF_OF_TABLE_EXPIRATION); + + assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(HALF_OF_TABLE_EXPIRATION); + + ticker.advance(HALF_OF_TABLE_EXPIRATION.plus(Duration.ofSeconds(10))); + + assertThat(tableCache.asMap()).doesNotContainKey(tableCacheKey); + + expectFullTableLoadForLoadTable(TABLE, adapter); + + catalog.loadTable(TABLE); + + assertThat(tableCache.stats().hitCount()).isEqualTo(0); + assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(Duration.ZERO); + } + + @Test + public void testTableCacheAgeDoesNotRefreshesAfterAccess() { + FakeTicker ticker = new FakeTicker(); + + TestableRESTCatalog catalog = + new TestableRESTCatalog( + DEFAULT_SESSION_CONTEXT, config -> new RESTCatalogAdapter(backendCatalog), ticker); + catalog.initialize("catalog", Map.of()); + + catalog.createNamespace(TABLE.namespace()); + + catalog.createTable(TABLE, SCHEMA); + + catalog.loadTable(TABLE); + + ticker.advance(HALF_OF_TABLE_EXPIRATION); + + Cache tableCache = + catalog.sessionCatalog().tableCache().tableCache(); + SessionIdTableId tableCacheKey = + SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE); + + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(HALF_OF_TABLE_EXPIRATION); + + catalog.loadTable(TABLE); + + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(HALF_OF_TABLE_EXPIRATION); + } + private RESTCatalog catalog(RESTCatalogAdapter adapter) { RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); @@ -3807,6 +4357,45 @@ protected RESTSessionCatalog newSessionCatalog( return catalog; } + private void expectFullTableLoadForLoadTable(TableIdentifier ident, RESTCatalogAdapter adapter) { + Answer invocationAssertsFullLoad = + invocation -> { + LoadTableResponse response = (LoadTableResponse) invocation.callRealMethod(); + + assertThat(response).isNotEqualTo(null); + + return response; + }; + + Mockito.doAnswer(invocationAssertsFullLoad) + .when(adapter) + .execute( + reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(ident)), + eq(LoadTableResponse.class), + any(), + any()); + } + + private void expectNotModifiedResponseForLoadTable( + TableIdentifier ident, RESTCatalogAdapter adapter) { + Answer invocationAssertsFullLoad = + invocation -> { + LoadTableResponse response = (LoadTableResponse) invocation.callRealMethod(); + + assertThat(response).isEqualTo(null); + + return response; + }; + + Mockito.doAnswer(invocationAssertsFullLoad) + .when(adapter) + .execute( + reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(ident)), + eq(LoadTableResponse.class), + any(), + any()); + } + static HTTPRequest reqMatcher(HTTPMethod method) { return argThat(req -> req.method() == method); } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java index f84197b0f16e..f4f7cf75dc7a 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -859,6 +859,26 @@ public void scanPlanningWithMultiplePartitionSpecs() throws IOException { } } + @Test + void remoteScanPlanningWithFreshnessAwareLoading() throws IOException { + RESTCatalog catalog = scanPlanningCatalog(); + + TableIdentifier tableIdentifier = TableIdentifier.of(NS, "freshness_aware_loading_test"); + restTableFor(catalog, tableIdentifier.name()); + + assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isZero(); + + // Table is cached with the first loadTable + catalog.loadTable(tableIdentifier); + assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isOne(); + + // Second loadTable is answered from cache + Table table = catalog.loadTable(tableIdentifier); + + // Verify table is RESTTable and newScan() returns RESTTableScan + restTableScanFor(table); + } + // ==================== Endpoint Support Tests ==================== /** Helper class to hold catalog and adapter for endpoint support tests. */ diff --git a/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java new file mode 100644 index 000000000000..cf66c04ade33 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestableRESTCatalog.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import com.github.benmanes.caffeine.cache.Ticker; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.catalog.SessionCatalog; + +class TestableRESTCatalog extends RESTCatalog { + private final Ticker ticker; + + TestableRESTCatalog( + SessionCatalog.SessionContext context, + Function, RESTClient> clientBuilder, + Ticker ticker) { + super(context, clientBuilder); + + this.ticker = ticker; + } + + @Override + protected RESTSessionCatalog newSessionCatalog( + Function, RESTClient> clientBuilder) { + // This is called from RESTCatalog's constructor, 'ticker' member is not yet set, we have to + // defer passing it to the session catalog. + return new TestableRESTSessionCatalog(clientBuilder, null); + } + + @Override + public void initialize(String name, Map props) { + ((TestableRESTSessionCatalog) sessionCatalog()).setTicker(ticker); + + super.initialize(name, props); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java new file mode 100644 index 000000000000..492fd998e862 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestableRESTSessionCatalog.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import com.github.benmanes.caffeine.cache.Ticker; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.iceberg.io.FileIO; + +class TestableRESTSessionCatalog extends RESTSessionCatalog { + private Ticker ticker; + + TestableRESTSessionCatalog( + Function, RESTClient> clientBuilder, + BiFunction, FileIO> ioBuilder) { + super(clientBuilder, ioBuilder); + } + + public void setTicker(Ticker newTicker) { + this.ticker = newTicker; + } + + @Override + protected RESTTableCache createTableCache(Map props) { + return new RESTTableCache(props, ticker); + } +} From 3051bc45d31afdab9dce4bd1dcd22f31ec5afbcc Mon Sep 17 00:00:00 2001 From: Gabor Kaszab Date: Fri, 16 Jan 2026 15:11:22 +0100 Subject: [PATCH 2/2] Addressed comments for RESTTableCache --- .../iceberg/rest/RESTSessionCatalog.java | 10 +-- .../apache/iceberg/rest/RESTTableCache.java | 61 +++++++++---------- .../apache/iceberg/rest/TestRESTCatalog.java | 44 +++++++------ .../iceberg/rest/TestRESTScanPlanning.java | 4 +- 4 files changed, 57 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 995a760ad273..0c4f8a39bfd9 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -70,7 +70,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; -import org.apache.iceberg.rest.RESTTableCache.TableSupplierWithETag; +import org.apache.iceberg.rest.RESTTableCache.TableWithETag; import org.apache.iceberg.rest.auth.AuthManager; import org.apache.iceberg.rest.auth.AuthManagers; import org.apache.iceberg.rest.auth.AuthSession; @@ -462,7 +462,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { TableIdentifier loadedIdent; Map responseHeaders = Maps.newHashMap(); - TableSupplierWithETag cachedTable = tableCache.getIfPresent(context.sessionId(), identifier); + TableWithETag cachedTable = tableCache.getIfPresent(context.sessionId(), identifier); try { response = @@ -476,7 +476,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { if (response == null) { Preconditions.checkNotNull(cachedTable, "Invalid load table response: null"); - return cachedTable.tableSupplier().get(); + return cachedTable.supplier().get(); } loadedIdent = identifier; @@ -503,7 +503,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { Preconditions.checkNotNull(cachedTable, "Invalid load table response: null"); return MetadataTableUtils.createMetadataTableInstance( - cachedTable.tableSupplier().get(), metadataType); + cachedTable.supplier().get(), metadataType); } loadedIdent = baseIdent; @@ -1457,7 +1457,7 @@ public void renameView(SessionContext context, TableIdentifier from, TableIdenti .post(paths.renameView(), request, null, mutationHeaders, ErrorHandlers.viewErrorHandler()); } - private static Map headersForLoadTable(TableSupplierWithETag tableWithETag) { + private static Map headersForLoadTable(TableWithETag tableWithETag) { if (tableWithETag == null) { return Map.of(); } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java index 6f2dd7ccfa8f..912173e9eb04 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableCache.java @@ -37,35 +37,7 @@ class RESTTableCache implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(RESTTableCache.class); - @Value.Immutable - interface SessionIdTableId { - String sessionId(); - - TableIdentifier tableIdentifier(); - - static SessionIdTableId of(String sessionId, TableIdentifier ident) { - return ImmutableSessionIdTableId.builder() - .sessionId(sessionId) - .tableIdentifier(ident) - .build(); - } - } - - @Value.Immutable - interface TableSupplierWithETag { - Supplier tableSupplier(); - - String eTag(); - - static TableSupplierWithETag of(Supplier tableSupplier, String eTag) { - return ImmutableTableSupplierWithETag.builder() - .tableSupplier(tableSupplier) - .eTag(eTag) - .build(); - } - } - - private final Cache tableCache; + private final Cache tableCache; RESTTableCache(Map props) { this(props, Ticker.systemTicker()); @@ -100,7 +72,7 @@ static TableSupplierWithETag of(Supplier tableSupplier, String eTag) .build(); } - public TableSupplierWithETag getIfPresent(String sessionId, TableIdentifier identifier) { + public TableWithETag getIfPresent(String sessionId, TableIdentifier identifier) { SessionIdTableId cacheKey = SessionIdTableId.of(sessionId, identifier); return tableCache.getIfPresent(cacheKey); } @@ -111,7 +83,7 @@ public void put( Supplier tableSupplier, String eTag) { tableCache.put( - SessionIdTableId.of(sessionId, identifier), TableSupplierWithETag.of(tableSupplier, eTag)); + SessionIdTableId.of(sessionId, identifier), TableWithETag.of(tableSupplier, eTag)); } public void invalidate(String sessionId, TableIdentifier identifier) { @@ -120,7 +92,7 @@ public void invalidate(String sessionId, TableIdentifier identifier) { } @VisibleForTesting - Cache tableCache() { + Cache cache() { return tableCache; } @@ -129,4 +101,29 @@ public void close() { tableCache.invalidateAll(); tableCache.cleanUp(); } + + @Value.Immutable + interface SessionIdTableId { + String sessionId(); + + TableIdentifier tableIdentifier(); + + static SessionIdTableId of(String sessionId, TableIdentifier ident) { + return ImmutableSessionIdTableId.builder() + .sessionId(sessionId) + .tableIdentifier(ident) + .build(); + } + } + + @Value.Immutable + interface TableWithETag { + Supplier supplier(); + + String eTag(); + + static TableWithETag of(Supplier tableSupplier, String eTag) { + return ImmutableTableWithETag.builder().supplier(tableSupplier).eTag(eTag).build(); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index dcb615e84682..6b981c493da0 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -98,7 +98,7 @@ import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; import org.apache.iceberg.rest.RESTTableCache.SessionIdTableId; -import org.apache.iceberg.rest.RESTTableCache.TableSupplierWithETag; +import org.apache.iceberg.rest.RESTTableCache.TableWithETag; import org.apache.iceberg.rest.auth.AuthManager; import org.apache.iceberg.rest.auth.AuthManagers; import org.apache.iceberg.rest.auth.AuthSession; @@ -3233,7 +3233,6 @@ public void testCommitStateUnknownNotReconciled() { .satisfies(ex -> assertThat(((CommitStateUnknownException) ex).getSuppressed()).isEmpty()); } - @SuppressWarnings("MethodLength") @Test public void testCustomTableOperationsInjection() throws IOException { AtomicBoolean customTableOpsCalled = new AtomicBoolean(); @@ -3869,8 +3868,8 @@ public void testFreshnessAwareLoading() { catalog().createTable(TABLE, SCHEMA); - Cache tableCache = - restCatalog.sessionCatalog().tableCache().tableCache(); + Cache tableCache = + restCatalog.sessionCatalog().tableCache().cache(); assertThat(tableCache.estimatedSize()).isZero(); expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer); @@ -3892,7 +3891,7 @@ public void testFreshnessAwareLoading() { tableCache .asMap() .get(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)) - .tableSupplier() + .supplier() .get() .operations() .current() @@ -3909,8 +3908,8 @@ public void testFreshnessAwareLoadingMetadataTables() { catalog().createTable(TABLE, SCHEMA); - Cache tableCache = - restCatalog.sessionCatalog().tableCache().tableCache(); + Cache tableCache = + restCatalog.sessionCatalog().tableCache().cache(); assertThat(tableCache.estimatedSize()).isZero(); BaseTable table = (BaseTable) catalog().loadTable(TABLE); @@ -3972,7 +3971,7 @@ public void testTableExistViaHeadRequestInvalidatesTable() { catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); // The main catalog still has the table in cache - assertThat(catalog.sessionCatalog().tableCache().tableCache().asMap()) + assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); catalog.tableExists(TABLE); @@ -4015,7 +4014,7 @@ public void testTableExistViaGetRequestInvalidatesTable() { catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); // The main catalog still has the table in cache - assertThat(cat.sessionCatalog().tableCache().tableCache().asMap()) + assertThat(cat.sessionCatalog().tableCache().cache().asMap()) .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); cat.tableExists(TABLE); @@ -4033,7 +4032,7 @@ public void testLoadTableInvalidatesCache() { catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); // The main catalog still has the table in cache - assertThat(catalog.sessionCatalog().tableCache().tableCache().asMap()) + assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); assertThatThrownBy(() -> catalog.loadTable(TABLE)) @@ -4056,7 +4055,7 @@ public void testLoadTableWithMetadataTableNameInvalidatesCache() { catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); // The main catalog still has the table in cache - assertThat(catalog.sessionCatalog().tableCache().tableCache().asMap()) + assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier)) @@ -4085,8 +4084,8 @@ private void runTableInvalidationTest( BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE); - Cache tableCache = - catalog.sessionCatalog().tableCache().tableCache(); + Cache tableCache = + catalog.sessionCatalog().tableCache().cache(); assertThat(tableCache.stats().hitCount()).isZero(); assertThat(tableCache.asMap()) .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); @@ -4137,8 +4136,7 @@ public void testTableCacheWithMultiSessions() { sessionCatalog.loadTable(DEFAULT_SESSION_CONTEXT, TABLE); - Cache tableCache = - sessionCatalog.tableCache().tableCache(); + Cache tableCache = sessionCatalog.tableCache().cache(); assertThat(tableCache.stats().hitCount()).isZero(); assertThat(tableCache.asMap()) .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); @@ -4211,7 +4209,7 @@ public T execute( catalog.loadTable(TABLE); - assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isZero(); + assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); } @Test @@ -4231,15 +4229,15 @@ public void testTableCacheIsDisabled() { catalog.createTable(TABLE, SCHEMA); - assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isZero(); + assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); expectFullTableLoadForLoadTable(TABLE, adapter); catalog.loadTable(TABLE); - catalog.sessionCatalog().tableCache().tableCache().cleanUp(); + catalog.sessionCatalog().tableCache().cache().cleanUp(); - assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isZero(); + assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); } @Test @@ -4258,8 +4256,8 @@ public void testFullTableLoadAfterExpiryFromCache() { catalog.loadTable(TABLE); - Cache tableCache = - catalog.sessionCatalog().tableCache().tableCache(); + Cache tableCache = + catalog.sessionCatalog().tableCache().cache(); SessionIdTableId tableCacheKey = SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE); @@ -4310,8 +4308,8 @@ public void testTableCacheAgeDoesNotRefreshesAfterAccess() { ticker.advance(HALF_OF_TABLE_EXPIRATION); - Cache tableCache = - catalog.sessionCatalog().tableCache().tableCache(); + Cache tableCache = + catalog.sessionCatalog().tableCache().cache(); SessionIdTableId tableCacheKey = SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java index f4f7cf75dc7a..0b1453682b75 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -866,11 +866,11 @@ void remoteScanPlanningWithFreshnessAwareLoading() throws IOException { TableIdentifier tableIdentifier = TableIdentifier.of(NS, "freshness_aware_loading_test"); restTableFor(catalog, tableIdentifier.name()); - assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isZero(); + assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); // Table is cached with the first loadTable catalog.loadTable(tableIdentifier); - assertThat(catalog.sessionCatalog().tableCache().tableCache().estimatedSize()).isOne(); + assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isOne(); // Second loadTable is answered from cache Table table = catalog.loadTable(tableIdentifier);