Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/BaseHTTPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ public <T extends RESTResponse> T get(
return execute(request, responseType, errorHandler, h -> {}, parserContext);
}

@Override
public <T extends RESTResponse> T get(
String path,
Map<String, String> queryParams,
Class<T> responseType,
Map<String, String> headers,
Consumer<ErrorResponse> errorHandler,
Consumer<Map<String, String>> responseHeaders) {
HTTPRequest request = buildRequest(HTTPMethod.GET, path, queryParams, headers, null);
return execute(request, responseType, errorHandler, responseHeaders);
}

@Override
public <T extends RESTResponse> T post(
String path,
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,30 @@ default <T extends RESTResponse> T get(
return get(path, queryParams, responseType, headers, errorHandler);
}

default <T extends RESTResponse> T get(
String path,
Map<String, String> queryParams,
Class<T> responseType,
Supplier<Map<String, String>> headers,
Consumer<ErrorResponse> errorHandler,
Consumer<Map<String, String>> responseHeaders) {
return get(path, queryParams, responseType, headers.get(), errorHandler, responseHeaders);
}

default <T extends RESTResponse> T get(
String path,
Map<String, String> queryParams,
Class<T> responseType,
Map<String, String> headers,
Consumer<ErrorResponse> errorHandler,
Consumer<Map<String, String>> responseHeaders) {
if (null != responseHeaders) {
throw new UnsupportedOperationException("Returning response headers is not supported");
}

return get(path, queryParams, responseType, headers, errorHandler);
}

<T extends RESTResponse> T get(
String path,
Map<String, String> queryParams,
Expand Down
22 changes: 20 additions & 2 deletions core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BaseTransaction;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
Expand Down Expand Up @@ -287,7 +288,23 @@ public RESTClient withAuthSession(AuthSession session) {

@SuppressWarnings({"MethodLength", "checkstyle:CyclomaticComplexity"})
public <T extends RESTResponse> T handleRequest(
Route route, Map<String, String> vars, Object body, Class<T> responseType) {
Route route,
Map<String, String> vars,
HTTPRequest httpRequest,
Class<T> responseType,
Consumer<Map<String, String>> responseHeaders) {
Object body = httpRequest.body();

if (responseHeaders != null) {
responseHeaders.accept(ImmutableMap.of("X-Request-Id", String.valueOf(System.nanoTime())));

// Set route specific response headers
switch (route) {
case CONFIG:
responseHeaders.accept(ImmutableMap.of("X-Iceberg-Version", IcebergBuild.version()));
}
Comment on lines +298 to +305
Copy link
Contributor Author

@dramaticlly dramaticlly Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is public class in test package and shared in many tests, want to leverage this to ensure we cover the basic test of response header. Please let me know if there's better place to set response headers before features like idempotent etag is implemented.

}

switch (route) {
case TOKENS:
return castResponse(responseType, handleOAuthRequest(body));
Expand Down Expand Up @@ -625,7 +642,8 @@ protected <T extends RESTResponse> T execute(
vars.putAll(request.queryParameters());
vars.putAll(routeAndVars.second());

return handleRequest(routeAndVars.first(), vars.build(), request.body(), responseType);
return handleRequest(
routeAndVars.first(), vars.build(), request, responseType, responseHeaders);

} catch (RuntimeException e) {
configureResponseFromException(e, errorBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.io.CharStreams;
import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
import org.apache.iceberg.rest.RESTCatalogAdapter.Route;
Expand All @@ -53,7 +54,7 @@ public class RESTCatalogServlet extends HttpServlet {
private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogServlet.class);

private final RESTCatalogAdapter restCatalogAdapter;
private final Map<String, String> responseHeaders =
private final Map<String, String> defaultResponseHeaderMap =
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());

public RESTCatalogServlet(RESTCatalogAdapter restCatalogAdapter) {
Expand Down Expand Up @@ -87,7 +88,7 @@ protected void doDelete(HttpServletRequest request, HttpServletResponse response
protected void execute(ServletRequestContext context, HttpServletResponse response)
throws IOException {
response.setStatus(HttpServletResponse.SC_OK);
responseHeaders.forEach(response::setHeader);
defaultResponseHeaderMap.forEach(response::setHeader);

if (context.error().isPresent()) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
Expand All @@ -104,9 +105,12 @@ protected void execute(ServletRequestContext context, HttpServletResponse respon
context.queryParams(),
context.headers(),
context.body());
Map<String, String> responseHeaders = Maps.newHashMap();
Object responseBody =
restCatalogAdapter.execute(
request, context.route().responseClass(), handle(response), h -> {});
request, context.route().responseClass(), handle(response), responseHeaders::putAll);

responseHeaders.forEach(response::setHeader);

if (responseBody != null) {
RESTObjectMapper.mapper().writeValue(response.getWriter(), responseBody);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@

/**
* * Exercises the RESTClient interface, specifically over a mocked-server using the actual
* HttpRESTClient code.
* HTTPClient code.
*/
public class TestHTTPClient {

Expand Down Expand Up @@ -602,7 +602,8 @@ private static Item doExecuteRequest(
case POST:
return restClient.post(path, body, Item.class, headers, onError, responseHeaders);
case GET:
return restClient.get(path, Item.class, headers, onError);
return restClient.get(
path, ImmutableMap.of(), Item.class, headers, onError, responseHeaders);
case HEAD:
restClient.head(path, headers, onError);
return null;
Expand Down
54 changes: 48 additions & 6 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -2207,23 +2208,26 @@ public void testPaginationForListNamespaces(int numberOfItems) {
eq(RESTCatalogAdapter.Route.LIST_NAMESPACES),
eq(ImmutableMap.of("pageToken", "", "pageSize", "10")),
any(),
eq(ListNamespacesResponse.class));
eq(ListNamespacesResponse.class),
any());

// verify second request with updated pageToken
Mockito.verify(adapter)
.handleRequest(
eq(RESTCatalogAdapter.Route.LIST_NAMESPACES),
eq(ImmutableMap.of("pageToken", "10", "pageSize", "10")),
any(),
eq(ListNamespacesResponse.class));
eq(ListNamespacesResponse.class),
any());

// verify third request with update pageToken
Mockito.verify(adapter)
.handleRequest(
eq(RESTCatalogAdapter.Route.LIST_NAMESPACES),
eq(ImmutableMap.of("pageToken", "20", "pageSize", "10")),
any(),
eq(ListNamespacesResponse.class));
eq(ListNamespacesResponse.class),
any());
}

@ParameterizedTest
Expand Down Expand Up @@ -2269,23 +2273,26 @@ public void testPaginationForListTables(int numberOfItems) {
eq(RESTCatalogAdapter.Route.LIST_TABLES),
eq(ImmutableMap.of("pageToken", "", "pageSize", "10", "namespace", namespaceName)),
any(),
eq(ListTablesResponse.class));
eq(ListTablesResponse.class),
any());

// verify second request with updated pageToken
Mockito.verify(adapter)
.handleRequest(
eq(RESTCatalogAdapter.Route.LIST_TABLES),
eq(ImmutableMap.of("pageToken", "10", "pageSize", "10", "namespace", namespaceName)),
any(),
eq(ListTablesResponse.class));
eq(ListTablesResponse.class),
any());

// verify third request with update pageToken
Mockito.verify(adapter)
.handleRequest(
eq(RESTCatalogAdapter.Route.LIST_TABLES),
eq(ImmutableMap.of("pageToken", "20", "pageSize", "10", "namespace", namespaceName)),
any(),
eq(ListTablesResponse.class));
eq(ListTablesResponse.class),
any());
}

@Test
Expand Down Expand Up @@ -2698,6 +2705,41 @@ public void testTableExistsFallbackToGETRequestWithLegacyServer() {
verifyTableExistsFallbackToGETRequest(ConfigResponse.builder().build());
}

@Test
public void testResponseHeadersConsumer() {
Map<String, String> capturedHeaders = Maps.newConcurrentMap();

RESTCatalogAdapter captureResponseHeaderAdapter =
new RESTCatalogAdapter(backendCatalog) {
@Override
public <T extends RESTResponse> T execute(
HTTPRequest request,
Class<T> responseType,
Consumer<ErrorResponse> errorHandler,
Consumer<Map<String, String>> responseHeaders) {
// Capture response headers for verification
Consumer<Map<String, String>> headerCapture = capturedHeaders::putAll;
return super.execute(request, responseType, errorHandler, headerCapture);
}
};

RESTCatalog catalog = catalog(captureResponseHeaderAdapter);

// Trigger a config request
catalog.initialize("test", ImmutableMap.of(CatalogProperties.URI, "ignored"));

// Verify config headers are present
assertThat(capturedHeaders)
.containsEntry("X-Iceberg-Version", IcebergBuild.version())
.containsKey("X-Request-Id");

capturedHeaders.clear();
catalog.listNamespaces();

// Verify default header is present
assertThat(capturedHeaders).doesNotContainKey("X-Iceberg-Version").containsKey("X-Request-Id");
}

private RESTCatalog catalog(RESTCatalogAdapter adapter) {
RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,26 @@ public void testPaginationForListViews(int numberOfItems) {
eq(RESTCatalogAdapter.Route.LIST_VIEWS),
eq(ImmutableMap.of("pageToken", "", "pageSize", "10", "namespace", namespaceName)),
any(),
eq(ListTablesResponse.class));
eq(ListTablesResponse.class),
any());

// verify second request with update pageToken
Mockito.verify(adapter)
.handleRequest(
eq(RESTCatalogAdapter.Route.LIST_VIEWS),
eq(ImmutableMap.of("pageToken", "10", "pageSize", "10", "namespace", namespaceName)),
any(),
eq(ListTablesResponse.class));
eq(ListTablesResponse.class),
any());

// verify third request with update pageToken
Mockito.verify(adapter)
.handleRequest(
eq(RESTCatalogAdapter.Route.LIST_VIEWS),
eq(ImmutableMap.of("pageToken", "20", "pageSize", "10", "namespace", namespaceName)),
any(),
eq(ListTablesResponse.class));
eq(ListTablesResponse.class),
any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;
Expand Down Expand Up @@ -52,13 +53,18 @@ public void createCatalog() throws Exception {

@Override
public <T extends RESTResponse> T handleRequest(
Route route, Map<String, String> vars, Object body, Class<T> responseType) {
Route route,
Map<String, String> vars,
HTTPRequest httpRequest,
Class<T> responseType,
Consumer<Map<String, String>> responseHeaders) {

if (CONFIG == route) {
// simulate a legacy server that doesn't send back supported endpoints
return castResponse(responseType, ConfigResponse.builder().build());
}

return super.handleRequest(route, vars, body, responseType);
return super.handleRequest(route, vars, httpRequest, responseType, responseHeaders);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.rest;

import java.util.Map;
import java.util.function.Consumer;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.gcp.GCPProperties;
Expand All @@ -38,8 +39,12 @@ class RESTServerCatalogAdapter extends RESTCatalogAdapter {

@Override
public <T extends RESTResponse> T handleRequest(
Route route, Map<String, String> vars, Object body, Class<T> responseType) {
T restResponse = super.handleRequest(route, vars, body, responseType);
Route route,
Map<String, String> vars,
HTTPRequest httpRequest,
Class<T> responseType,
Consumer<Map<String, String>> responseHeaders) {
T restResponse = super.handleRequest(route, vars, httpRequest, responseType, responseHeaders);

if (restResponse instanceof LoadTableResponse) {
if (PropertyUtil.propertyAsBoolean(
Expand Down