diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannelTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannelTest.java index b6d02eab00..07d050716a 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannelTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannelTest.java @@ -41,6 +41,8 @@ import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; +import com.google.cloud.hadoop.util.RetryHttpInitializer; +import com.google.cloud.hadoop.util.RetryHttpInitializerOptions; import com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.ErrorResponses; import com.google.common.collect.ImmutableMap; import java.io.FileNotFoundException; @@ -52,6 +54,7 @@ import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.Set; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -496,6 +499,31 @@ public void afterRetry_subsequentReads_succeed() throws IOException { assertThat(readChannel.generation()).isEqualTo(generation); } + @Test + public void retired_request_same_invocationId() throws IOException { + long generation = 5L; + MockHttpTransport transport = + mockTransport( + jsonErrorResponse(ErrorResponses.RATE_LIMITED), + jsonDataResponse(newStorageObject(BUCKET_NAME, OBJECT_NAME).setGeneration(generation))); + TrackingHttpRequestInitializer requestsTracker = + new TrackingHttpRequestInitializer( + new RetryHttpInitializer(null, RetryHttpInitializerOptions.builder().build())); + Storage storage = new Storage(transport, GsonFactory.getDefaultInstance(), requestsTracker); + + GoogleCloudStorageReadOptions options = + GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(false).build(); + GoogleCloudStorageReadChannel readChannel = createReadChannel(storage, options, generation); + + assertThat(readChannel.size()).isNotEqualTo(0); + List invocationIds = requestsTracker.getAllRequestInvocationIds(); + // Request is retired only once, making total request count to be 2. + assertThat(invocationIds.size()).isEqualTo(2); + Set uniqueInvocationIds = Set.copyOf(invocationIds); + // For retried request invocationId remains same causing the set to contain only one element + assertThat(uniqueInvocationIds.size()).isEqualTo(1); + } + @Test public void read_gzipEncoded_shouldReadAllBytes() throws IOException { byte[] testData = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/TrackingHttpRequestInitializer.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/TrackingHttpRequestInitializer.java index 01c6ca0eb4..5d6e36b03b 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/TrackingHttpRequestInitializer.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/TrackingHttpRequestInitializer.java @@ -19,10 +19,14 @@ import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.GOOGLEAPIS_ENDPOINT; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.function.Predicate.not; import com.google.api.client.http.HttpExecuteInterceptor; +import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpRequestInitializer; +import com.google.cloud.hadoop.util.interceptors.InvocationIdInterceptor; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -169,12 +173,36 @@ public ImmutableList getAllRequestStrings() { .collect(toImmutableList()); } + public ImmutableList getAllRequestInvocationIds() { + return requests.stream() + .map(r -> getInvocationId(r.getHeaders())) + .filter(not(Strings::isNullOrEmpty)) + .collect(toImmutableList()); + } + public ImmutableList getAllRawRequestStrings() { return requests.stream() .map(GoogleCloudStorageIntegrationHelper::requestToString) .collect(toImmutableList()); } + private String getInvocationId(HttpHeaders header) { + String apiClientHeader = (String) header.get(InvocationIdInterceptor.GOOG_API_CLIENT); + // This is how the header value look like + // x-goog-api-client -> gl-java/11.0.12 gdcl/1.32.2 mac-os-x/12.5 + // gccl-invocation-id/9ad3804c-fdc1-4cb1-8337-5cf6ae1829b5 + int beginIndex = apiClientHeader.indexOf(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); + if (beginIndex >= 0) { + beginIndex = beginIndex + InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX.length(); + int endIndex = + Math.max( + apiClientHeader.indexOf(" ", beginIndex), apiClientHeader.indexOf(",", beginIndex)); + endIndex = endIndex == -1 ? apiClientHeader.length() : endIndex; + return apiClientHeader.substring(beginIndex, endIndex); + } + return null; + } + private String replacePageTokenWithId(String request, AtomicLong pageTokenId) { return replaceRequestParams ? replaceWithId(request, PAGE_TOKEN_PARAM_PATTERN, "pageToken=token_", pageTokenId) diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java index 102b24c9a2..5d5dde953a 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java @@ -104,6 +104,9 @@ public void open_lazyInit_whenFastFailOnNotFound_isFalse() throws IOException { assertThat(readChannel.size()).isEqualTo(expectedSize); } + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()) .containsExactly( getRequestString( @@ -127,6 +130,9 @@ public void open_withItemInfo() throws IOException { assertThat(readChannel.size()).isEqualTo(expectedSize); } + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()).isEmpty(); } @@ -147,6 +153,10 @@ public void writeLargeObject_withSmallUploadChunk() throws IOException { partitionsCount); assertObjectContent(helperGcs, resourceId, partition, partitionsCount); + + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()) .containsExactlyElementsIn( getExpectedRequestsForCreateObject( @@ -217,6 +227,9 @@ public void writeObject_withNonAlignedUploadChunk() throws IOException { assertObjectContent(helperGcs, resourceId, partition, partitionsCount); + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()) .containsExactlyElementsIn( getExpectedRequestsForCreateObject( @@ -254,6 +267,9 @@ public void conflictingWrites_noOverwrite_lastFails() throws IOException { assertObjectContent(helperGcs, resourceId, bytesToWrite, /* expectedBytesCount= */ 1); + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()) .containsExactly( getRequestString(resourceId.getBucketName(), resourceId.getObjectName()), @@ -269,6 +285,9 @@ public void conflictingWrites_noOverwrite_lastFails() throws IOException { /* uploadId= */ 1)) .inOrder(); + assertThat(trackingGcs2.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs2.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs2.requestsTracker.getAllRequestStrings()) .containsExactly( getRequestString(resourceId.getBucketName(), resourceId.getObjectName()), @@ -306,6 +325,9 @@ public void create_doesNotRepairImplicitDirectories() throws IOException { .asList() .containsExactly(resourceId); + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()) .containsExactly( uploadRequestString( @@ -341,6 +363,9 @@ public void create_correctlySetsContentType() throws IOException { .containsExactly("text/plain", "image/png", "application/octet-stream") .inOrder(); + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()) .containsExactly( uploadRequestString( @@ -401,6 +426,9 @@ public void copy_withRewrite_multipleRequests() throws IOException { assertObjectContent(helperGcs, copiedResourceId, partition, partitionsCount); + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()) .containsExactly( getBucketRequestString(resourceId.getBucketName()), @@ -452,6 +480,9 @@ public void create_gcsItemInfo_metadataEquals() throws IOException { assertThat(itemInfo.metadataEquals(itemInfo.getMetadata())).isTrue(); assertThat(itemInfo.metadataEquals(wrongMetadata)).isFalse(); + assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size()) + .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size()); + assertThat(trackingGcs.requestsTracker.getAllRequestStrings()) .containsExactly( uploadRequestString( diff --git a/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java b/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java index 15c8d62f4c..a0d900f28b 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java @@ -31,6 +31,7 @@ import com.google.api.client.util.ExponentialBackOff; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.hadoop.util.interceptors.InvocationIdInterceptor; import com.google.common.collect.ImmutableSet; import com.google.common.flogger.GoogleLogger; import com.google.common.flogger.LogContext; @@ -95,6 +96,7 @@ public void initialize(HttpRequest request) throws IOException { headers.setUserAgent(options.getDefaultUserAgent()); } headers.putAll(options.getHttpHeaders()); + request.setInterceptor(new InvocationIdInterceptor(request.getInterceptor())); } public Credentials getCredentials() { diff --git a/util/src/main/java/com/google/cloud/hadoop/util/interceptors/InvocationIdInterceptor.java b/util/src/main/java/com/google/cloud/hadoop/util/interceptors/InvocationIdInterceptor.java new file mode 100644 index 0000000000..b765be7cc5 --- /dev/null +++ b/util/src/main/java/com/google/cloud/hadoop/util/interceptors/InvocationIdInterceptor.java @@ -0,0 +1,76 @@ +/* + * Copyright 2022 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.util.interceptors; + +import com.google.api.client.http.HttpExecuteInterceptor; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequest; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.UUID; + +/** + * HTTP request interceptor to attach unique identifier i.e. invocationId to each new request and + * make sure to pass in the same identifier if request is retried. + */ +@VisibleForTesting +public final class InvocationIdInterceptor implements HttpExecuteInterceptor { + + public static final String GCCL_INVOCATION_ID_PREFIX = "gccl-invocation-id/"; + + public static final String GOOG_API_CLIENT = "x-goog-api-client"; + + private final HttpExecuteInterceptor interceptor; + + public InvocationIdInterceptor(HttpExecuteInterceptor interceptor) { + this.interceptor = interceptor; + } + + @Override + public void intercept(HttpRequest request) throws IOException { + if (this.interceptor != null) { + this.interceptor.intercept(request); + } + HttpHeaders headers = request.getHeaders(); + String existing = (String) headers.get(GOOG_API_CLIENT); + if (isInvocationIdPresent(existing)) { + // As invocationId is already present,It's a retried request. + // TODO: add support for attempt_count + return; + } + // Replicating the logic from `manual` client. + // reference: + // https://github.com/googleapis/java-storage/blob/main/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java#L156-L177 + final String signatureKey = "Signature="; // For V2 and V4 signedURLs + final String builtURL = request.getUrl().build(); + if (!builtURL.contains(signatureKey)) { + UUID invocationId = UUID.randomUUID(); + String invocationEntry = GCCL_INVOCATION_ID_PREFIX + invocationId; + final String newValue; + if (existing != null && !existing.isEmpty()) { + newValue = existing + " " + invocationEntry; + } else { + newValue = invocationEntry; + } + headers.set(GOOG_API_CLIENT, newValue); + } + } + + private static boolean isInvocationIdPresent(String apiClientHeader) { + return (apiClientHeader != null && apiClientHeader.contains(GCCL_INVOCATION_ID_PREFIX)); + } +} diff --git a/util/src/test/java/com/google/cloud/hadoop/util/RetryHttpInitializerTest.java b/util/src/test/java/com/google/cloud/hadoop/util/RetryHttpInitializerTest.java index 7db63ddfec..f637ce112e 100644 --- a/util/src/test/java/com/google/cloud/hadoop/util/RetryHttpInitializerTest.java +++ b/util/src/test/java/com/google/cloud/hadoop/util/RetryHttpInitializerTest.java @@ -28,6 +28,7 @@ import com.google.api.client.http.HttpResponseException; import com.google.api.client.http.HttpStatusCodes; import com.google.auth.Credentials; +import com.google.cloud.hadoop.util.interceptors.InvocationIdInterceptor; import com.google.cloud.hadoop.util.testing.FakeCredentials; import com.google.cloud.hadoop.util.testing.ThrowingInputStream; import com.google.common.collect.ImmutableList; @@ -67,6 +68,8 @@ public void successfulRequest_authenticated() throws IOException { HttpResponse res = req.execute(); assertThat(res).isNotNull(); + assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT)) + .contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK); } @@ -86,7 +89,8 @@ public void forbiddenResponse_failsWithoutRetries() throws IOException { "authorization", ImmutableList.of(authHeaderValue)); HttpResponseException thrown = assertThrows(HttpResponseException.class, req::execute); - + assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT)) + .contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); assertThat(thrown.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_FORBIDDEN); } @@ -116,7 +120,8 @@ private void errorCodeResponse_succeedsAfterRetries(int statusCode) throws Excep "authorization", ImmutableList.of(authHeaderValue)); HttpResponse res = req.execute(); - + assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT)) + .contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); assertThat(res).isNotNull(); assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK); } @@ -142,7 +147,8 @@ public void ioExceptionResponse_succeedsAfterRetries() throws Exception { "authorization", ImmutableList.of(authHeaderValue)); HttpResponse res = req.execute(); - + assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT)) + .contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); assertThat(res).isNotNull(); assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK); }