Skip to content

Commit

Permalink
adding support for invocationId in Json path (#850)
Browse files Browse the repository at this point in the history
* adding support for invocationId in Json path

* addressed review comments

* addressed review comments

* addressed review comments

* addressed review comments
  • Loading branch information
singhravidutt committed Aug 18, 2022
1 parent db12a53 commit de53e52
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise;
import com.google.cloud.hadoop.util.RetryHttpInitializer;
import com.google.cloud.hadoop.util.RetryHttpInitializerOptions;
import com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.ErrorResponses;
import com.google.common.collect.ImmutableMap;
import java.io.FileNotFoundException;
Expand All @@ -52,6 +54,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -496,6 +499,31 @@ public void afterRetry_subsequentReads_succeed() throws IOException {
assertThat(readChannel.generation()).isEqualTo(generation);
}

@Test
public void retired_request_same_invocationId() throws IOException {
long generation = 5L;
MockHttpTransport transport =
mockTransport(
jsonErrorResponse(ErrorResponses.RATE_LIMITED),
jsonDataResponse(newStorageObject(BUCKET_NAME, OBJECT_NAME).setGeneration(generation)));
TrackingHttpRequestInitializer requestsTracker =
new TrackingHttpRequestInitializer(
new RetryHttpInitializer(null, RetryHttpInitializerOptions.builder().build()));
Storage storage = new Storage(transport, GsonFactory.getDefaultInstance(), requestsTracker);

GoogleCloudStorageReadOptions options =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(false).build();
GoogleCloudStorageReadChannel readChannel = createReadChannel(storage, options, generation);

assertThat(readChannel.size()).isNotEqualTo(0);
List<String> invocationIds = requestsTracker.getAllRequestInvocationIds();
// Request is retired only once, making total request count to be 2.
assertThat(invocationIds.size()).isEqualTo(2);
Set<String> uniqueInvocationIds = Set.copyOf(invocationIds);
// For retried request invocationId remains same causing the set to contain only one element
assertThat(uniqueInvocationIds.size()).isEqualTo(1);
}

@Test
public void read_gzipEncoded_shouldReadAllBytes() throws IOException {
byte[] testData = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.GOOGLEAPIS_ENDPOINT;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.function.Predicate.not;

import com.google.api.client.http.HttpExecuteInterceptor;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.cloud.hadoop.util.interceptors.InvocationIdInterceptor;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
Expand Down Expand Up @@ -169,12 +173,36 @@ public ImmutableList<String> getAllRequestStrings() {
.collect(toImmutableList());
}

public ImmutableList<String> getAllRequestInvocationIds() {
return requests.stream()
.map(r -> getInvocationId(r.getHeaders()))
.filter(not(Strings::isNullOrEmpty))
.collect(toImmutableList());
}

public ImmutableList<String> getAllRawRequestStrings() {
return requests.stream()
.map(GoogleCloudStorageIntegrationHelper::requestToString)
.collect(toImmutableList());
}

private String getInvocationId(HttpHeaders header) {
String apiClientHeader = (String) header.get(InvocationIdInterceptor.GOOG_API_CLIENT);
// This is how the header value look like
// x-goog-api-client -> gl-java/11.0.12 gdcl/1.32.2 mac-os-x/12.5
// gccl-invocation-id/9ad3804c-fdc1-4cb1-8337-5cf6ae1829b5
int beginIndex = apiClientHeader.indexOf(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX);
if (beginIndex >= 0) {
beginIndex = beginIndex + InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX.length();
int endIndex =
Math.max(
apiClientHeader.indexOf(" ", beginIndex), apiClientHeader.indexOf(",", beginIndex));
endIndex = endIndex == -1 ? apiClientHeader.length() : endIndex;
return apiClientHeader.substring(beginIndex, endIndex);
}
return null;
}

private String replacePageTokenWithId(String request, AtomicLong pageTokenId) {
return replaceRequestParams
? replaceWithId(request, PAGE_TOKEN_PARAM_PATTERN, "pageToken=token_", pageTokenId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public void open_lazyInit_whenFastFailOnNotFound_isFalse() throws IOException {
assertThat(readChannel.size()).isEqualTo(expectedSize);
}

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings())
.containsExactly(
getRequestString(
Expand All @@ -127,6 +130,9 @@ public void open_withItemInfo() throws IOException {
assertThat(readChannel.size()).isEqualTo(expectedSize);
}

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings()).isEmpty();
}

Expand All @@ -147,6 +153,10 @@ public void writeLargeObject_withSmallUploadChunk() throws IOException {
partitionsCount);

assertObjectContent(helperGcs, resourceId, partition, partitionsCount);

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings())
.containsExactlyElementsIn(
getExpectedRequestsForCreateObject(
Expand Down Expand Up @@ -217,6 +227,9 @@ public void writeObject_withNonAlignedUploadChunk() throws IOException {

assertObjectContent(helperGcs, resourceId, partition, partitionsCount);

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings())
.containsExactlyElementsIn(
getExpectedRequestsForCreateObject(
Expand Down Expand Up @@ -254,6 +267,9 @@ public void conflictingWrites_noOverwrite_lastFails() throws IOException {

assertObjectContent(helperGcs, resourceId, bytesToWrite, /* expectedBytesCount= */ 1);

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings())
.containsExactly(
getRequestString(resourceId.getBucketName(), resourceId.getObjectName()),
Expand All @@ -269,6 +285,9 @@ public void conflictingWrites_noOverwrite_lastFails() throws IOException {
/* uploadId= */ 1))
.inOrder();

assertThat(trackingGcs2.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs2.requestsTracker.getAllRequests().size());

assertThat(trackingGcs2.requestsTracker.getAllRequestStrings())
.containsExactly(
getRequestString(resourceId.getBucketName(), resourceId.getObjectName()),
Expand Down Expand Up @@ -306,6 +325,9 @@ public void create_doesNotRepairImplicitDirectories() throws IOException {
.asList()
.containsExactly(resourceId);

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings())
.containsExactly(
uploadRequestString(
Expand Down Expand Up @@ -341,6 +363,9 @@ public void create_correctlySetsContentType() throws IOException {
.containsExactly("text/plain", "image/png", "application/octet-stream")
.inOrder();

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings())
.containsExactly(
uploadRequestString(
Expand Down Expand Up @@ -401,6 +426,9 @@ public void copy_withRewrite_multipleRequests() throws IOException {

assertObjectContent(helperGcs, copiedResourceId, partition, partitionsCount);

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings())
.containsExactly(
getBucketRequestString(resourceId.getBucketName()),
Expand Down Expand Up @@ -452,6 +480,9 @@ public void create_gcsItemInfo_metadataEquals() throws IOException {
assertThat(itemInfo.metadataEquals(itemInfo.getMetadata())).isTrue();
assertThat(itemInfo.metadataEquals(wrongMetadata)).isFalse();

assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
.isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());

assertThat(trackingGcs.requestsTracker.getAllRequestStrings())
.containsExactly(
uploadRequestString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.api.client.util.ExponentialBackOff;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.interceptors.InvocationIdInterceptor;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.common.flogger.LogContext;
Expand Down Expand Up @@ -95,6 +96,7 @@ public void initialize(HttpRequest request) throws IOException {
headers.setUserAgent(options.getDefaultUserAgent());
}
headers.putAll(options.getHttpHeaders());
request.setInterceptor(new InvocationIdInterceptor(request.getInterceptor()));
}

public Credentials getCredentials() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2022 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.util.interceptors;

import com.google.api.client.http.HttpExecuteInterceptor;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequest;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.UUID;

/**
* HTTP request interceptor to attach unique identifier i.e. invocationId to each new request and
* make sure to pass in the same identifier if request is retried.
*/
@VisibleForTesting
public final class InvocationIdInterceptor implements HttpExecuteInterceptor {

public static final String GCCL_INVOCATION_ID_PREFIX = "gccl-invocation-id/";

public static final String GOOG_API_CLIENT = "x-goog-api-client";

private final HttpExecuteInterceptor interceptor;

public InvocationIdInterceptor(HttpExecuteInterceptor interceptor) {
this.interceptor = interceptor;
}

@Override
public void intercept(HttpRequest request) throws IOException {
if (this.interceptor != null) {
this.interceptor.intercept(request);
}
HttpHeaders headers = request.getHeaders();
String existing = (String) headers.get(GOOG_API_CLIENT);
if (isInvocationIdPresent(existing)) {
// As invocationId is already present,It's a retried request.
// TODO: add support for attempt_count
return;
}
// Replicating the logic from `manual` client.
// reference:
// https://github.com/googleapis/java-storage/blob/main/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java#L156-L177
final String signatureKey = "Signature="; // For V2 and V4 signedURLs
final String builtURL = request.getUrl().build();
if (!builtURL.contains(signatureKey)) {
UUID invocationId = UUID.randomUUID();
String invocationEntry = GCCL_INVOCATION_ID_PREFIX + invocationId;
final String newValue;
if (existing != null && !existing.isEmpty()) {
newValue = existing + " " + invocationEntry;
} else {
newValue = invocationEntry;
}
headers.set(GOOG_API_CLIENT, newValue);
}
}

private static boolean isInvocationIdPresent(String apiClientHeader) {
return (apiClientHeader != null && apiClientHeader.contains(GCCL_INVOCATION_ID_PREFIX));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.auth.Credentials;
import com.google.cloud.hadoop.util.interceptors.InvocationIdInterceptor;
import com.google.cloud.hadoop.util.testing.FakeCredentials;
import com.google.cloud.hadoop.util.testing.ThrowingInputStream;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -67,6 +68,8 @@ public void successfulRequest_authenticated() throws IOException {
HttpResponse res = req.execute();

assertThat(res).isNotNull();
assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT))
.contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX);
assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK);
}

Expand All @@ -86,7 +89,8 @@ public void forbiddenResponse_failsWithoutRetries() throws IOException {
"authorization", ImmutableList.of(authHeaderValue));

HttpResponseException thrown = assertThrows(HttpResponseException.class, req::execute);

assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT))
.contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX);
assertThat(thrown.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_FORBIDDEN);
}

Expand Down Expand Up @@ -116,7 +120,8 @@ private void errorCodeResponse_succeedsAfterRetries(int statusCode) throws Excep
"authorization", ImmutableList.of(authHeaderValue));

HttpResponse res = req.execute();

assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT))
.contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX);
assertThat(res).isNotNull();
assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK);
}
Expand All @@ -142,7 +147,8 @@ public void ioExceptionResponse_succeedsAfterRetries() throws Exception {
"authorization", ImmutableList.of(authHeaderValue));

HttpResponse res = req.execute();

assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT))
.contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX);
assertThat(res).isNotNull();
assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK);
}
Expand Down

0 comments on commit de53e52

Please sign in to comment.