Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ private static boolean isSuccessful(CloseableHttpResponse response) {
int code = response.getCode();
return code == HttpStatus.SC_OK
|| code == HttpStatus.SC_ACCEPTED
|| code == HttpStatus.SC_NO_CONTENT;
|| code == HttpStatus.SC_NO_CONTENT
|| code == HttpStatus.SC_NOT_MODIFIED;
}

private static ErrorResponse buildDefaultErrorResponse(CloseableHttpResponse response) {
Expand Down Expand Up @@ -324,8 +325,7 @@ protected <T extends RESTResponse> T execute(
responseHeaders.accept(respHeaders);

// Skip parsing the response stream for any successful request not expecting a response body
if (response.getCode() == HttpStatus.SC_NO_CONTENT
|| (responseType == null && isSuccessful(response))) {
if (emptyBody(response, responseType)) {
return null;
}

Expand Down Expand Up @@ -360,6 +360,13 @@ protected <T extends RESTResponse> T execute(
}
}

private <T extends RESTResponse> boolean emptyBody(
CloseableHttpResponse response, Class<T> responseType) {
return response.getCode() == HttpStatus.SC_NO_CONTENT
|| response.getCode() == HttpStatus.SC_NOT_MODIFIED
Comment on lines +365 to +366
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the extra code check here? Wouldn't the isSuccessful already cover this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is required here because this one triggers returning early with. Without this we'd run into an "Invalid (null) response body" RESTException.
However, the additional check into isSuccessful is not necessary required (same would be tru for SC_NO_CONTENT too), I think to be future proof it semantically makes sense for an isSuccessful function to return true for 304 too even though the current code returns before calling that function. WDYT?

Additionally, when I experimented around this area I noticed that the current test injects RESTCatalogAdapter directly into RESTSessionCatalog avoiding this code path through HTTPClient. Let me rethink the test a little bit so that we could go through the whole path.

|| (responseType == null && isSuccessful(response));
}

@Override
public void close() throws IOException {
// Do not close the AuthSession as it's managed by the owner of this HTTPClient.
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/HTTPHeaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -51,6 +52,11 @@ default Set<HTTPHeader> entries(String name) {
.collect(Collectors.toSet());
}

/** Returns the first entry in this group for the given name (case-insensitive). */
default Optional<HTTPHeader> firstEntry(String name) {
return entries().stream().filter(header -> header.name().equalsIgnoreCase(name)).findFirst();
}

/** Returns whether this group contains an entry with the given name (case-insensitive). */
default boolean contains(String name) {
return entries().stream().anyMatch(header -> header.name().equalsIgnoreCase(name));
Expand Down
13 changes: 11 additions & 2 deletions core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -279,8 +280,16 @@ public <T extends RESTResponse> T handleRequest(
tableIdentFromPathVars(vars),
snapshotModeFromQueryParams(httpRequest.queryParameters()));

responseHeaders.accept(
ImmutableMap.of(HttpHeaders.ETAG, ETagProvider.of(response.metadataLocation())));
Optional<HTTPHeaders.HTTPHeader> ifNoneMatchHeader =
httpRequest.headers().firstEntry(HttpHeaders.IF_NONE_MATCH);

String eTag = ETagProvider.of(response.metadataLocation());

if (ifNoneMatchHeader.isPresent() && eTag.equals(ifNoneMatchHeader.get().value())) {
return null;
}

responseHeaders.accept(ImmutableMap.of(HttpHeaders.ETAG, eTag));

return castResponse(responseType, response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ protected void execute(ServletRequestContext context, HttpServletResponse respon

if (responseBody != null) {
RESTObjectMapper.mapper().writeValue(response.getWriter(), responseBody);
} else {
Pair<Route, Map<String, String>> routeAndVars =
Route.from(request.method(), request.path());
if (routeAndVars != null) {
Route route = routeAndVars.first();
if (route == Route.LOAD_TABLE) {
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
}
}
}
} catch (RESTException e) {
LOG.error("Error processing REST request", e);
Expand Down
118 changes: 94 additions & 24 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
private RESTCatalog restCatalog;
private InMemoryCatalog backendCatalog;
private Server httpServer;
private RESTCatalogAdapter adapterForRESTServer;

@BeforeEach
public void createCatalog() throws Exception {
Expand All @@ -139,34 +140,36 @@ public void createCatalog() throws Exception {
"test-header",
"test-value"));

RESTCatalogAdapter adaptor =
new RESTCatalogAdapter(backendCatalog) {
@Override
public <T extends RESTResponse> T execute(
HTTPRequest request,
Class<T> responseType,
Consumer<ErrorResponse> errorHandler,
Consumer<Map<String, String>> responseHeaders) {
// this doesn't use a Mockito spy because this is used for catalog tests, which have
// different method calls
if (!ResourcePaths.tokens().equals(request.path())) {
if (ResourcePaths.config().equals(request.path())) {
assertThat(request.headers().entries()).containsAll(catalogHeaders.entries());
} else {
assertThat(request.headers().entries()).containsAll(contextHeaders.entries());
adapterForRESTServer =
Mockito.spy(
new RESTCatalogAdapter(backendCatalog) {
@Override
public <T extends RESTResponse> T execute(
HTTPRequest request,
Class<T> responseType,
Consumer<ErrorResponse> errorHandler,
Consumer<Map<String, String>> responseHeaders) {
// this doesn't use a Mockito spy because this is used for catalog tests, which have
// different method calls
if (!ResourcePaths.tokens().equals(request.path())) {
if (ResourcePaths.config().equals(request.path())) {
assertThat(request.headers().entries()).containsAll(catalogHeaders.entries());
} else {
assertThat(request.headers().entries()).containsAll(contextHeaders.entries());
}
}
Object body = roundTripSerialize(request.body(), "request");
HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build();
T response = super.execute(req, responseType, errorHandler, responseHeaders);
T responseAfterSerialization = roundTripSerialize(response, "response");
return responseAfterSerialization;
}
}
Object body = roundTripSerialize(request.body(), "request");
HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build();
T response = super.execute(req, responseType, errorHandler, responseHeaders);
T responseAfterSerialization = roundTripSerialize(response, "response");
return responseAfterSerialization;
}
};
});

ServletContextHandler servletContext =
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContext.addServlet(new ServletHolder(new RESTCatalogServlet(adaptor)), "/*");
servletContext.addServlet(
new ServletHolder(new RESTCatalogServlet(adapterForRESTServer)), "/*");
servletContext.setHandler(new GzipHandler());

this.httpServer = new Server(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
Expand Down Expand Up @@ -2884,6 +2887,73 @@ public void testETagWithRegisterTable() {
assertThat(respHeaders).containsEntry(HttpHeaders.ETAG, eTag);
}

@SuppressWarnings("checkstyle:AssertThatThrownByWithMessageCheck")
@Test
public void testNotModified() {
catalog().createNamespace(TABLE.namespace());

Table table = catalog().createTable(TABLE, SCHEMA);

String eTag =
ETagProvider.of(((BaseTable) table).operations().current().metadataFileLocation());

Mockito.doAnswer(
invocation -> {
HTTPRequest originalRequest = invocation.getArgument(0);

HTTPHeaders extendedHeaders =
ImmutableHTTPHeaders.copyOf(originalRequest.headers())
.putIfAbsent(
ImmutableHTTPHeader.builder()
.name(HttpHeaders.IF_NONE_MATCH)
.value(eTag)
.build());

ImmutableHTTPRequest extendedRequest =
ImmutableHTTPRequest.builder()
.from(originalRequest)
.headers(extendedHeaders)
.build();

return adapterForRESTServer.execute(
extendedRequest,
LoadTableResponse.class,
invocation.getArgument(2),
invocation.getArgument(3),
ParserContext.builder().build());
})
.when(adapterForRESTServer)
.execute(
reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
eq(LoadTableResponse.class),
any(),
any());

// TODO: This won't throw when client side of freshness-aware loading is implemented
assertThatThrownBy(() -> catalog().loadTable(TABLE)).isInstanceOf(NullPointerException.class);

TableIdentifier metadataTableIdentifier =
TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), "partitions");

// TODO: This won't throw when client side of freshness-aware loading is implemented
assertThatThrownBy(() -> catalog().loadTable(metadataTableIdentifier))
.isInstanceOf(NullPointerException.class);

Mockito.verify(adapterForRESTServer, times(2))
.execute(
reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)),
eq(LoadTableResponse.class),
any(),
any());

verify(adapterForRESTServer)
.execute(
reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(metadataTableIdentifier)),
any(),
any(),
any());
}

private RESTCatalog catalogWithResponseHeaders(Map<String, String> respHeaders) {
RESTCatalogAdapter adapter =
new RESTCatalogAdapter(backendCatalog) {
Expand Down