Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,16 @@ protected <T extends RESTResponse> T execute(

// Skip parsing the response stream for any successful request not expecting a response body
if (emptyBody(response, responseType)) {
if (response.getCode() == HttpStatus.SC_NOT_MODIFIED
&& !req.headers().contains(HttpHeaders.IF_NONE_MATCH)) {
// 304-NOT_MODIFIED is used for freshness-aware loading and requires an ETag sent to the
// server via IF_NONE_MATCH header in the request. If no ETag was sent, we shouldn't
// receive a 304.
throw new RESTException(
"Invalid (NOT_MODIFIED) response for request: method=%s, path=%s",
req.method(), req.path());
}

return null;
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public void initialize(String name, Map<String, String> props) {
sessionCatalog.initialize(name, props);
}

protected RESTSessionCatalog sessionCatalog() {
return sessionCatalog;
}

@Override
public String name() {
return sessionCatalog.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.rest;

import java.util.concurrent.TimeUnit;

public final class RESTCatalogProperties {

private RESTCatalogProperties() {}
Expand All @@ -43,6 +45,15 @@ private RESTCatalogProperties() {}

public static final String REST_SCAN_PLAN_ID = "rest-scan-plan-id";

// Properties that control the behaviour of the table cache used for freshness-aware table
// loading.
public static final String TABLE_CACHE_EXPIRE_AFTER_WRITE_MS =
"rest-table-cache.expire-after-write-ms";
public static final long TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5);

public static final String TABLE_CACHE_MAX_ENTRIES = "rest-table-cache.max-entries";
public static final int TABLE_CACHE_MAX_ENTRIES_DEFAULT = 100;

public enum SnapshotMode {
ALL,
REFS
Expand Down
168 changes: 134 additions & 34 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
Expand Down Expand Up @@ -60,13 +62,15 @@
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.MetricsReporters;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode;
import org.apache.iceberg.rest.RESTTableCache.TableWithETag;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
import org.apache.iceberg.rest.auth.AuthSession;
Expand Down Expand Up @@ -165,6 +169,8 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private Supplier<Map<String, String>> mutationHeaders = Map::of;
private String namespaceSeparator = null;

private RESTTableCache tableCache;

public RESTSessionCatalog() {
this(
config ->
Expand Down Expand Up @@ -277,9 +283,22 @@ public void initialize(String name, Map<String, String> unresolved) {
mergedProps,
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED,
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT);

this.tableCache = createTableCache(mergedProps);
this.closeables.addCloseable(this.tableCache);

super.initialize(name, mergedProps);
}

protected RESTTableCache createTableCache(Map<String, String> props) {
return new RESTTableCache(props);
}

@VisibleForTesting
RESTTableCache tableCache() {
return tableCache;
}

@Override
public void setConf(Object newConf) {
this.conf = newConf;
Expand Down Expand Up @@ -332,6 +351,8 @@ public boolean dropTable(SessionContext context, TableIdentifier identifier) {
return true;
} catch (NoSuchTableException e) {
return false;
} finally {
invalidateTable(context, identifier);
}
}

Expand All @@ -353,6 +374,8 @@ public boolean purgeTable(SessionContext context, TableIdentifier identifier) {
return true;
} catch (NoSuchTableException e) {
return false;
} finally {
invalidateTable(context, identifier);
}
}

Expand All @@ -370,6 +393,8 @@ public void renameTable(SessionContext context, TableIdentifier from, TableIdent
client
.withAuthSession(contextualSession)
.post(paths.rename(), request, null, mutationHeaders, ErrorHandlers.tableErrorHandler());

invalidateTable(context, from);
}

@Override
Expand All @@ -384,9 +409,15 @@ public boolean tableExists(SessionContext context, TableIdentifier identifier) {
return true;
} else {
// fallback in order to work with 1.7.x and older servers
return super.tableExists(context, identifier);
if (!super.tableExists(context, identifier)) {
invalidateTable(context, identifier);
return false;
}

return true;
}
} catch (NoSuchTableException e) {
invalidateTable(context, identifier);
return false;
}
}
Expand All @@ -396,7 +427,11 @@ private static Map<String, String> snapshotModeToParam(SnapshotMode mode) {
}

private LoadTableResponse loadInternal(
SessionContext context, TableIdentifier identifier, SnapshotMode mode) {
SessionContext context,
TableIdentifier identifier,
SnapshotMode mode,
Map<String, String> headers,
Consumer<Map<String, String>> responseHeaders) {
Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE);
AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
return client
Expand All @@ -405,8 +440,9 @@ private LoadTableResponse loadInternal(
paths.table(identifier),
snapshotModeToParam(mode),
LoadTableResponse.class,
Map.of(),
ErrorHandlers.tableErrorHandler());
headers,
ErrorHandlers.tableErrorHandler(),
responseHeaders);
}

@Override
Expand All @@ -424,8 +460,25 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
MetadataTableType metadataType;
LoadTableResponse response;
TableIdentifier loadedIdent;

Map<String, String> responseHeaders = Maps.newHashMap();
TableWithETag cachedTable = tableCache.getIfPresent(context.sessionId(), identifier);

try {
response = loadInternal(context, identifier, snapshotMode);
response =
loadInternal(
context,
identifier,
snapshotMode,
headersForLoadTable(cachedTable),
responseHeaders::putAll);

if (response == null) {
Preconditions.checkNotNull(cachedTable, "Invalid load table response: null");

return cachedTable.supplier().get();
}

loadedIdent = identifier;
metadataType = null;

Expand All @@ -435,14 +488,33 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
// attempt to load a metadata table using the identifier's namespace as the base table
TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels());
try {
response = loadInternal(context, baseIdent, snapshotMode);
responseHeaders.clear();
cachedTable = tableCache.getIfPresent(context.sessionId(), baseIdent);

response =
loadInternal(
context,
baseIdent,
snapshotMode,
headersForLoadTable(cachedTable),
Copy link
Member

@XJDKC XJDKC Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach requires the table to have an entry in the cachedTable, so that we can retrieve its ETag and include it in the loadTable request. However, what if the ETag is cached externally, and the caller wants to supply the ETag directly when loading the table?

This use case is valid and common, the caller may store the ETag in somewhere for performance optimizations and expect to use it in the next loadTable request (may not use the same client).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look, @XJDKC !
External ETags weren't in the scope of the original design. In fact the ETags for this functionality are deliberately kept 'under the hood' and not exposed to the clients/engines.
The use-case you describe wouldn't simply require the ETag to be passed in to loadTable() but also to have some mechanism to answer with a table object in case of a 304-NotModified. Would you simply expect null from loadTable() and have a cache on the caller side to answer such queries?

Copy link
Member

@XJDKC XJDKC Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the caller might maintain a cache on the caller side. Many catalogs now support multiplexing, so the caller may have a cache to store the latest table metadata info.

For this use case, if the caller got an null pointer or something that indicates that the table is not modified, the caller needs to handle it by itself. e.g., don't need to proceed.

Does it make sense to allow the caller to pass in the ETag in the SessionContext? Maybe via a property in the property map?

(Noticed that, unfortunately, for RESTCatalog::loadTable, it doesn't allow the caller to pass in a SessionContext at table level, is it possible to add another interface?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation, @XJDKC !
I see your point. At the moment ETag handling, and answering from cache is deliberately kept internal to RESTSessionCatalog. One of the requirements when designing this was to not change existing interfaces or introduce new ones. Maybe we can re-visit this later on, but for simplicity and keeping the API clean I don't think we should provide a way to provide external ETags through the loadTable API
Additional observation I have is ETags are coming from the REST server, and are kept within the RESTSessionCatalog, but not persisted into the table object, nor into the table ops, so the users of loadTable API won't have a way to get the ETag of a table other than calculating it themselves. However, that goes against the purpose of an ETag, as it should be opaque to the clients.

Copy link
Member

@XJDKC XJDKC Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fully understand your points, they make sense!
But just to add some context from real-world use cases: There are scenarios where the query engine tries to optimize loadTable performance by persisting the ETag somewhere. That way, when the engine restarts, it doesn't have to perform a full loadTable again and can instead reuse the previously fetched version.

It would be great if we could provide some flexibility or hooks for customization when implementing this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these users call the REST catalog API directly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you for describing the use case @XJDKC !
When the engine restarts, it's not enough to persist the ETag somewhere because in order to answer a loadTable call you need a table object too.
Anyway, I still think that the scope of this PR should be as narrow as possible, and not exposing ETags is what we agreed on in the proposal doc. Once this work is done, further changes could be initiated.

responseHeaders::putAll);

if (response == null) {
Preconditions.checkNotNull(cachedTable, "Invalid load table response: null");

return MetadataTableUtils.createMetadataTableInstance(
cachedTable.supplier().get(), metadataType);
}

loadedIdent = baseIdent;
} catch (NoSuchTableException ignored) {
// the base table does not exist
invalidateTable(context, baseIdent);
throw original;
}
} else {
// name is not a metadata table
invalidateTable(context, identifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels redundant. We've already loaded this identifier and determined it's not in the cache, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it's feasible that we get here and the table exists in the cache. See TestRESTCatalog.testLoadTableInvalidatesCache. Steps:

  1. We load the table. This populates the cache
  2. With another catalog instance (maybe through another engine for instance) we drop the table. The table will remain in the cache in this catalog.
  3. Another load table will fail because the table doesn't exist anymore, while the table still in the cache. I think it makes sense to invalidate the table in this case.

throw original;
}
}
Expand All @@ -461,7 +533,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
.setPreviousFileLocation(null)
.setSnapshotsSupplier(
() ->
loadInternal(context, finalIdentifier, SnapshotMode.ALL)
loadInternal(context, finalIdentifier, SnapshotMode.ALL, Map.of(), h -> {})
.tableMetadata()
.snapshots())
.discardChanges()
Expand All @@ -470,38 +542,52 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
tableMetadata = response.tableMetadata();
}

List<Credential> credentials = response.credentials();
RESTClient tableClient = client.withAuthSession(tableSession);
RESTTableOperations ops =
newTableOps(
tableClient,
paths.table(finalIdentifier),
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
tableMetadata,
endpoints);
Supplier<BaseTable> tableSupplier =
createTableSupplier(
finalIdentifier, tableMetadata, context, tableClient, tableConf, credentials);

trackFileIO(ops);

// RestTable should only be returned for non-metadata tables, because client would
// not have access to metadata files for example manifests, since all it needs is catalog.
if (metadataType == null) {
RESTTable restTable = restTableForScanPlanning(ops, finalIdentifier, tableClient);
if (restTable != null) {
return restTable;
}
String eTag = responseHeaders.getOrDefault(HttpHeaders.ETAG, null);
if (eTag != null) {
tableCache.put(context.sessionId(), finalIdentifier, tableSupplier, eTag);
}

BaseTable table =
new BaseTable(
ops,
fullTableName(finalIdentifier),
metricsReporter(paths.metrics(finalIdentifier), tableClient));
if (metadataType != null) {
return MetadataTableUtils.createMetadataTableInstance(table, metadataType);
return MetadataTableUtils.createMetadataTableInstance(tableSupplier.get(), metadataType);
}

return table;
return tableSupplier.get();
}

private Supplier<BaseTable> createTableSupplier(
TableIdentifier identifier,
TableMetadata tableMetadata,
SessionContext context,
RESTClient tableClient,
Map<String, String> tableConf,
List<Credential> credentials) {
return () -> {
RESTTableOperations ops =
newTableOps(
tableClient,
paths.table(identifier),
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, credentials),
tableMetadata,
endpoints);

trackFileIO(ops);

RESTTable table = restTableForScanPlanning(ops, identifier, tableClient);
if (table != null) {
return table;
}

return new BaseTable(
ops, fullTableName(identifier), metricsReporter(paths.metrics(identifier), tableClient));
};
}

private RESTTable restTableForScanPlanning(
Expand Down Expand Up @@ -546,7 +632,9 @@ public Catalog.TableBuilder buildTable(
}

@Override
public void invalidateTable(SessionContext context, TableIdentifier ident) {}
public void invalidateTable(SessionContext context, TableIdentifier ident) {
tableCache.invalidate(context.sessionId(), ident);
}

@Override
public Table registerTable(
Expand Down Expand Up @@ -906,7 +994,8 @@ public Transaction replaceTransaction() {
throw new AlreadyExistsException("View with same name already exists: %s", ident);
}

LoadTableResponse response = loadInternal(context, ident, snapshotMode);
LoadTableResponse response = loadInternal(context, ident, snapshotMode, Map.of(), h -> {});

String fullName = fullTableName(ident);

Map<String, String> tableConf = response.config();
Expand Down Expand Up @@ -1368,6 +1457,17 @@ public void renameView(SessionContext context, TableIdentifier from, TableIdenti
.post(paths.renameView(), request, null, mutationHeaders, ErrorHandlers.viewErrorHandler());
}

private static Map<String, String> headersForLoadTable(TableWithETag tableWithETag) {
if (tableWithETag == null) {
return Map.of();
}

String eTag = tableWithETag.eTag();
Preconditions.checkArgument(eTag != null, "Invalid ETag: null");

return Map.of(HttpHeaders.IF_NONE_MATCH, eTag);
}

private class RESTViewBuilder implements ViewBuilder {
private final SessionContext context;
private final TableIdentifier identifier;
Expand Down
Loading