Skip to content

Commit

Permalink
Perform response deserialization in HttpPipeline (Azure#360)
Browse files Browse the repository at this point in the history
* Add DecodingPolicyFactory. Fix client-runtime tests.

* Fix test failures mainly by reverting to older impls

* Rename inner class

* Fix checkstyle

* Cleanup

* Remove unused methods

* RestResponse implements Closeable

* Add null check to DecodingPolicy

* Fixes based on self-review

* Fixes from review feedback
  • Loading branch information
RikkiGibson authored Feb 2, 2018
1 parent f3ff5bf commit 47f7cd1
Show file tree
Hide file tree
Showing 37 changed files with 683 additions and 457 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Token based credentials for use with a REST Service Client.
Expand All @@ -32,7 +30,6 @@ public final class AzureCliCredentials extends AzureTokenCredentials {
private Map<String, AzureCliSubscription> subscriptions;
private File azureProfile;
private File accessTokens;
private Lock lock = new ReentrantLock();

private AzureCliCredentials() {
super(null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
import com.microsoft.rest.v2.http.HttpHeaders;
import com.microsoft.rest.v2.http.HttpResponse;
import com.microsoft.rest.v2.protocol.SerializerAdapter;
import com.microsoft.rest.v2.protocol.SerializerEncoding;
import com.microsoft.rest.v2.serializer.JacksonAdapter;
import io.reactivex.Flowable;
import io.reactivex.Single;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

public class MockHttpResponse extends HttpResponse {
private final static SerializerAdapter<?> serializer = new JacksonAdapter();
Expand Down Expand Up @@ -49,7 +48,7 @@ public MockHttpResponse(int statusCode, Object serializable) {
this(statusCode);

try {
this.string = serializer.serialize(serializable);
this.string = serializer.serialize(serializable, SerializerEncoding.JSON);
} catch (IOException e) {
e.printStackTrace();
}
Expand All @@ -70,11 +69,6 @@ public HttpHeaders headers() {
return new HttpHeaders(headers);
}

@Override
public Single<? extends InputStream> bodyAsInputStreamAsync() {
return Single.just(new ByteArrayInputStream(byteArray));
}

@Override
public Single<byte[]> bodyAsByteArrayAsync() {
return Single.just(byteArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ else if (pollingSucceeded) {
throw new IllegalStateException("Polling is completed and did not succeed. Cannot create a polling request.");
}

return new HttpRequest(fullyQualifiedMethodName(), HttpMethod.GET, pollUrl);
return new HttpRequest(fullyQualifiedMethodName(), HttpMethod.GET, pollUrl, createResponseDecoder());
}

@Override
Expand Down Expand Up @@ -149,11 +149,13 @@ public boolean isDone() {
* use when polling.
*/
static PollStrategy tryToCreate(RestProxy restProxy, SwaggerMethodParser methodParser, HttpRequest originalHttpRequest, HttpResponse httpResponse, long delayInMilliseconds) {
String urlHeader = getHeader(httpResponse);
URL azureAsyncOperationUrl = null;

try {
azureAsyncOperationUrl = new URL(getHeader(httpResponse));
} catch (MalformedURLException ignored) {
if (urlHeader != null) {
try {
azureAsyncOperationUrl = new URL(urlHeader);
} catch (MalformedURLException ignored) {
}
}

return azureAsyncOperationUrl != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.microsoft.rest.v2.SwaggerMethodParser;
import com.microsoft.rest.v2.http.HttpRequest;
import com.microsoft.rest.v2.http.HttpResponse;
import com.microsoft.rest.v2.protocol.SerializerEncoding;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Single;
Expand Down Expand Up @@ -130,7 +131,7 @@ private static String javaVersion() {
return javaVersion;
}

private static <T> String getDefaultUserAgentString(Class<?> swaggerInterface) {
private static String getDefaultUserAgentString(Class<?> swaggerInterface) {
final String packageImplementationVersion = swaggerInterface == null ? "" : "/" + swaggerInterface.getPackage().getImplementationVersion();
final String operatingSystem = operatingSystem();
final String macAddressHash = macAddressHash();
Expand Down Expand Up @@ -382,7 +383,7 @@ public PollStrategy apply(String originalHttpResponseBody) {
PollStrategy result;
try {
final SerializerAdapter<?> serializer = serializer();
final ResourceWithProvisioningState resource = serializer.deserialize(originalHttpResponseBody, ResourceWithProvisioningState.class, SerializerAdapter.Encoding.JSON);
final ResourceWithProvisioningState resource = serializer.deserialize(originalHttpResponseBody, ResourceWithProvisioningState.class, SerializerEncoding.JSON);
if (resource != null && resource.properties() != null && !OperationState.isCompleted(resource.properties().provisioningState())) {
result = new ProvisioningStatePollStrategy(AzureProxy.this, methodParser, httpRequest, resource.properties().provisioningState(), delayInMilliseconds);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.microsoft.rest.v2.RestProxy;
import com.microsoft.rest.v2.SwaggerMethodParser;
import com.microsoft.rest.v2.http.BufferedHttpResponse;
import com.microsoft.rest.v2.http.HttpRequest;
import com.microsoft.rest.v2.http.HttpResponse;
import io.reactivex.Observable;
Expand All @@ -20,30 +21,29 @@
* further polling.
*/
public class CompletedPollStrategy extends PollStrategy {
private final HttpResponse bufferedOriginalHttpResponse;
private final BufferedHttpResponse firstHttpResponse;

/**
* Create a new CompletedPollStrategy.
* @param restProxy The RestProxy that created this PollStrategy.
* @param methodParser The method parser that describes the service interface method that
* initiated the long running operation.
* @param originalHttpResponse The HTTP response to the original HTTP request.
* @param firstHttpResponse The HTTP response to the original HTTP request.
*/
public CompletedPollStrategy(RestProxy restProxy, SwaggerMethodParser methodParser, HttpResponse originalHttpResponse) {
public CompletedPollStrategy(RestProxy restProxy, SwaggerMethodParser methodParser, HttpResponse firstHttpResponse) {
super(restProxy, methodParser, 0);

this.bufferedOriginalHttpResponse = originalHttpResponse.buffer();
this.firstHttpResponse = firstHttpResponse.buffer();
setStatus(OperationState.SUCCEEDED);
}

@Override
HttpRequest createPollRequest() {
return null;
throw new UnsupportedOperationException();
}

@Override
Single<HttpResponse> updateFromAsync(HttpResponse httpPollResponse) {
return null;
throw new UnsupportedOperationException();
}

@Override
Expand All @@ -52,10 +52,10 @@ boolean isDone() {
}

Observable<OperationStatus<Object>> pollUntilDoneWithStatusUpdates(final HttpRequest originalHttpRequest, final SwaggerMethodParser methodParser, final Type operationStatusResultType) {
return createOperationStatusObservable(originalHttpRequest, bufferedOriginalHttpResponse, methodParser, operationStatusResultType);
return createOperationStatusObservable(originalHttpRequest, firstHttpResponse, methodParser, operationStatusResultType);
}

Single<HttpResponse> pollUntilDone() {
return Single.just(bufferedOriginalHttpResponse);
return Single.<HttpResponse>just(firstHttpResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private LocationPollStrategy(RestProxy restProxy, SwaggerMethodParser methodPars

@Override
public HttpRequest createPollRequest() {
return new HttpRequest(fullyQualifiedMethodName(), HttpMethod.GET, locationUrl);
return new HttpRequest(fullyQualifiedMethodName(), HttpMethod.GET, locationUrl, createResponseDecoder());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import com.microsoft.rest.v2.SwaggerMethodParser;
import com.microsoft.rest.v2.http.HttpRequest;
import com.microsoft.rest.v2.http.HttpResponse;
import com.microsoft.rest.v2.protocol.SerializerAdapter.Encoding;
import com.microsoft.rest.v2.protocol.HttpResponseDecoder;
import com.microsoft.rest.v2.protocol.SerializerEncoding;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Single;
Expand Down Expand Up @@ -42,7 +43,7 @@ abstract class PollStrategy {

@SuppressWarnings("unchecked")
protected <T> T deserialize(String value, Type returnType) throws IOException {
return (T) restProxy.deserialize(value, returnType, null, Encoding.JSON);
return (T) restProxy.serializer().deserialize(value, returnType, SerializerEncoding.JSON);
}

protected Single<HttpResponse> ensureExpectedStatus(HttpResponse httpResponse) {
Expand Down Expand Up @@ -121,6 +122,10 @@ void setStatus(String status) {
this.status = status;
}

protected final HttpResponseDecoder createResponseDecoder() {
return new HttpResponseDecoder(methodParser, restProxy.serializer());
}

/**
* Create a new HTTP poll request.
* @return A new HTTP poll request.
Expand Down Expand Up @@ -149,6 +154,7 @@ public Observable<HttpResponse> call() {
@Override
public Single<HttpResponse> call() throws Exception {
final HttpRequest pollRequest = createPollRequest();

return restProxy.sendHttpRequestAsync(pollRequest);
}
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* A PollStrategy that will continue to poll a resource's URL until the resource's provisioning
* state property is in a completed state.
*/
public class ProvisioningStatePollStrategy extends PollStrategy {
public final class ProvisioningStatePollStrategy extends PollStrategy {
private final HttpRequest originalRequest;
private final SwaggerMethodParser methodParser;

Expand All @@ -34,12 +34,12 @@ public class ProvisioningStatePollStrategy extends PollStrategy {

@Override
HttpRequest createPollRequest() {
return new HttpRequest(originalRequest.callerMethod(), HttpMethod.GET, originalRequest.url());
return new HttpRequest(originalRequest.callerMethod(), HttpMethod.GET, originalRequest.url(), createResponseDecoder());
}

@Override
Single<HttpResponse> updateFromAsync(HttpResponse httpPollResponse) {
return ensureExpectedStatus(httpPollResponse)
Single<HttpResponse> updateFromAsync(HttpResponse pollResponse) {
return ensureExpectedStatus(pollResponse)
.flatMap(new Function<HttpResponse, Single<HttpResponse>>() {
@Override
public Single<HttpResponse> apply(HttpResponse response) {
Expand All @@ -48,31 +48,26 @@ public Single<HttpResponse> apply(HttpResponse response) {
.map(new Function<String, HttpResponse>() {
@Override
public HttpResponse apply(String responseBody) {
ResourceWithProvisioningState resource = null;
try {
resource = deserialize(responseBody, ResourceWithProvisioningState.class);
} catch (IOException ignored) {
}
ResourceWithProvisioningState resource = null;
try {
resource = deserialize(responseBody, ResourceWithProvisioningState.class);
} catch (IOException ignored) {
}

if (resource == null || resource.properties() == null || resource.properties().provisioningState() == null) {
if (methodParser.isExpectedResponseStatusCode(bufferedHttpPollResponse.statusCode())) {
setStatus(OperationState.SUCCEEDED);
} else {
setStatus(OperationState.FAILED);
}
}
else if (OperationState.isFailedOrCanceled(resource.properties().provisioningState())) {
throw new CloudException("Async operation failed with provisioning state: " + resource.properties().provisioningState(), bufferedHttpPollResponse);
}
else {
setStatus(resource.properties().provisioningState());
}
if (resource == null || resource.properties() == null || resource.properties().provisioningState() == null) {
throw new CloudException("The polling response does not contain a valid body", bufferedHttpPollResponse, null);
}
else if (OperationState.isFailedOrCanceled(resource.properties().provisioningState())) {
throw new CloudException("Async operation failed with provisioning state: " + resource.properties().provisioningState(), bufferedHttpPollResponse);
}
else {
setStatus(resource.properties().provisioningState());
}
return bufferedHttpPollResponse;
}
});
}
});

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.microsoft.rest.v2.RestException;
import com.microsoft.rest.v2.http.HttpRequest;
import com.microsoft.rest.v2.http.HttpResponse;
import com.microsoft.rest.v2.policy.DecodingPolicyFactory;
import com.microsoft.rest.v2.protocol.SerializerAdapter;
import com.microsoft.rest.v2.serializer.JacksonAdapter;
import com.microsoft.rest.v2.InvalidReturnTypeException;
Expand Down Expand Up @@ -771,7 +772,7 @@ public Single<HttpResponse> sendRequestAsync(HttpRequest request) {
}

private static <T> T createMockService(Class<T> serviceClass, MockAzureHttpClient httpClient) {
return AzureProxy.create(serviceClass, (AzureEnvironment) null, HttpPipeline.build(httpClient), serializer);
return AzureProxy.create(serviceClass, null, HttpPipeline.build(httpClient, new DecodingPolicyFactory()), serializer);
}

private static void assertContains(String value, String expectedSubstring) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.microsoft.rest.v2.http.HttpPipeline;
import com.microsoft.rest.v2.RestException;
import com.microsoft.rest.v2.RestResponse;
import com.microsoft.rest.v2.policy.DecodingPolicyFactory;
import com.microsoft.rest.v2.protocol.SerializerAdapter;
import com.microsoft.rest.v2.serializer.JacksonAdapter;
import com.microsoft.rest.v2.annotations.BodyParam;
Expand Down Expand Up @@ -138,36 +139,6 @@ public void AsyncGetRequestWithNoReturn() {
.blockingAwait();
}

@Host("http://httpbin.org")
private interface Service4 {
@GET("bytes/2")
@ExpectedResponses({200})
InputStream getByteStream();

@GET("bytes/2")
@ExpectedResponses({200})
Single<InputStream> getByteStreamAsync();
}

@Test
public void SyncGetRequestWithInputStreamReturn() throws IOException {
final InputStream byteStream = createService(Service4.class)
.getByteStream();
final byte[] buffer = new byte[10];
assertEquals(2, byteStream.read(buffer));
assertEquals(-1, byteStream.read(buffer));
}

@Test
public void AsyncGetRequestWithInputStreamReturn() throws IOException {
final InputStream byteStream = createService(Service4.class)
.getByteStreamAsync()
.blockingGet();
final byte[] buffer = new byte[10];
assertEquals(2, byteStream.read(buffer));
assertEquals(-1, byteStream.read(buffer));
}

@Host("http://httpbin.org")
private interface Service5 {
@GET("anything")
Expand Down Expand Up @@ -776,7 +747,7 @@ public void service18GetStatus500WithExpectedResponse500() {

private <T> T createService(Class<T> serviceClass) {
final HttpClient httpClient = createHttpClient();
return AzureProxy.create(serviceClass, (AzureEnvironment) null, HttpPipeline.build(httpClient), serializer);
return AzureProxy.create(serviceClass, null, HttpPipeline.build(httpClient, new DecodingPolicyFactory()), serializer);
}

private static void assertContains(String value, String expectedSubstring) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
package com.microsoft.azure.v2.http;

import com.google.common.base.Charsets;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.common.io.CharStreams;
import com.microsoft.azure.v2.AsyncOperationResource;
import com.microsoft.azure.v2.AzureAsyncOperationPollStrategy;
import com.microsoft.azure.v2.MockResource;
Expand All @@ -24,13 +21,9 @@
import com.microsoft.rest.v2.http.HttpResponse;
import com.microsoft.rest.v2.util.FlowableUtil;
import io.reactivex.Single;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Function;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
Expand Down
Loading

0 comments on commit 47f7cd1

Please sign in to comment.