Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ public ConfigurationAsyncClient build() {

policies.add(new HttpLoggingPolicy(httpLogDetailLevel));

HttpPipeline pipeline = httpClient == null
? new HttpPipeline(policies)
: new HttpPipeline(httpClient, policies);
HttpPipeline pipeline = HttpPipeline.builder()
.policies(policies.toArray(new HttpPipelinePolicy[0]))
.httpClient(httpClient)
.build();

return new ConfigurationAsyncClient(serviceEndpoint, pipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,17 @@ public static HttpPipeline createDefaultPipeline(Class<?> swaggerInterface, Asyn
public static HttpPipeline createDefaultPipeline(Class<?> swaggerInterface, HttpPipelinePolicy credentialsPolicy) {
// Order in which policies applied will be the order in which they appear in the array
//
List<HttpPipelinePolicy> policies = new ArrayList<HttpPipelinePolicy>();
List<HttpPipelinePolicy> policies = new ArrayList<>();
policies.add(new UserAgentPolicy(getDefaultUserAgentString(swaggerInterface)));
policies.add(new RetryPolicy());
policies.add(new CookiePolicy());
if (credentialsPolicy != null) {
policies.add(credentialsPolicy);
}
return new HttpPipeline(policies.toArray(new HttpPipelinePolicy[policies.size()]));

return HttpPipeline.builder()
.policies(policies.toArray(new HttpPipelinePolicy[0]))
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,9 @@ public Mono<HttpResponse> send(HttpRequest request) {
}

private static <T> T createMockService(Class<T> serviceClass, MockAzureHttpClient httpClient) {
HttpPipeline pipeline = new HttpPipeline(httpClient);
HttpPipeline pipeline = HttpPipeline.builder()
.httpClient(httpClient)
.build();

return AzureProxy.create(serviceClass, null, pipeline, SERIALIZER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,9 @@ public void service18GetStatus500WithExpectedResponse500() {
}

private <T> T createService(Class<T> serviceClass) {
HttpPipeline pipeline = new HttpPipeline(createHttpClient());
HttpPipeline pipeline = HttpPipeline.builder()
.httpClient(createHttpClient())
.build();
//
return AzureProxy.create(serviceClass, null, pipeline, SERIALIZER);
}
Expand Down
96 changes: 28 additions & 68 deletions core/azure-core/src/main/java/com/azure/core/http/HttpPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@

package com.azure.core.http;

import com.azure.core.util.Context;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.Context;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

Expand All @@ -18,75 +17,47 @@ public final class HttpPipeline {
private final HttpClient httpClient;
private final HttpPipelinePolicy[] pipelinePolicies;

/**
* Creates a HttpPipeline holding array of policies that gets applied to all request initiated through
* {@link HttpPipeline#send(HttpPipelineCallContext)} and it's response.
*
* @param httpClient the http client to write request to wire and receive response from wire.
* @param pipelinePolicies pipeline policies in the order they need to applied, a copy of this array will
* be made hence changing the original array after the creation of pipeline
* will not mutate the pipeline
*/
public HttpPipeline(HttpClient httpClient, HttpPipelinePolicy... pipelinePolicies) {
Objects.requireNonNull(httpClient);
Objects.requireNonNull(pipelinePolicies);
this.pipelinePolicies = Arrays.copyOf(pipelinePolicies, pipelinePolicies.length);
this.httpClient = httpClient;
}

/**
* Creates a HttpPipeline holding array of policies that gets applied all request initiated through
* {@link HttpPipeline#send(HttpPipelineCallContext)} and it's response.
* Creates a builder that can configure options for the HttpPipeline before creating an instance of it.
*
* The default HttpClient {@link HttpClient#createDefault()} will be used to write request to wire and
* receive response from wire.
*
* @param pipelinePolicies pipeline policies in the order they need to applied, a copy of this array will
* be made hence changing the original array after the creation of pipeline
* will not mutate the pipeline
* @return A new {@link HttpPipelineBuilder} to create a HttpPipeline from.
*/
public HttpPipeline(HttpPipelinePolicy... pipelinePolicies) {
this(HttpClient.createDefault(), pipelinePolicies);
public static HttpPipelineBuilder builder() {
return new HttpPipelineBuilder();
}

/**
* Creates a HttpPipeline holding array of policies that gets applied to all request initiated through
* {@link HttpPipeline#send(HttpPipelineCallContext)} and it's response.
*
* @param httpClient the http client to write request to wire and receive response from wire.
* @param pipelinePolicies pipeline policies in the order they need to applied, a copy of this list
* will be made so changing the original list after the creation of pipeline
* will not mutate the pipeline
* @param pipelinePolicies pipeline policies in the order they need to applied, a copy of this array will
* be made hence changing the original array after the creation of pipeline
* will not mutate the pipeline
*/
public HttpPipeline(HttpClient httpClient, List<HttpPipelinePolicy> pipelinePolicies) {
HttpPipeline(HttpClient httpClient, List<HttpPipelinePolicy> pipelinePolicies) {
Objects.requireNonNull(httpClient);
Objects.requireNonNull(pipelinePolicies);
this.pipelinePolicies = pipelinePolicies.toArray(new HttpPipelinePolicy[0]);
this.httpClient = httpClient;
this.pipelinePolicies = pipelinePolicies.toArray(new HttpPipelinePolicy[0]);
}

/**
* Creates a HttpPipeline holding array of policies that gets applied all request initiated through
* {@link HttpPipeline#send(HttpPipelineCallContext)} and it's response.
*
* The default HttpClient {@link HttpClient#createDefault()} will be used to write request to wire and
* receive response from wire.
*
* @param pipelinePolicies pipeline policies in the order they need to applied, a copy of this list
* will be made so changing the original list after the creation of pipeline
* will not mutate the pipeline
* Get the policy at the passed index in the pipeline.
* @param index index of the the policy to retrieve.
* @return the policy stored at that index.
*/
public HttpPipeline(List<HttpPipelinePolicy> pipelinePolicies) {
this(HttpClient.createDefault(), pipelinePolicies);
public HttpPipelinePolicy getPolicy(final int index) {
return this.pipelinePolicies[index];
}

/**
* Get the policies in the pipeline.
*
* @return policies in the pipeline
* Get the count of policies in the pipeline.
* @return count of policies.
*/
public HttpPipelinePolicy[] pipelinePolicies() {
return Arrays.copyOf(this.pipelinePolicies, this.pipelinePolicies.length);
public int getPolicyCount() {
return this.pipelinePolicies.length;
}

/**
Expand All @@ -99,34 +70,23 @@ public HttpClient httpClient() {
}

/**
* Creates a new context local to the provided http request.
*
* @param httpRequest the request for a context needs to be created
* @return the request context
*/
public HttpPipelineCallContext newContext(HttpRequest httpRequest) {
return new HttpPipelineCallContext(httpRequest);
}

/**
* Creates a new context local to the provided http request.
* Wraps the request in a context and send it through pipeline.
*
* @param httpRequest the request for a context needs to be created
* @param data the data to associate with the context
* @return the request context
* @param request the request
* @return a publisher upon subscription flows the context through policies, sends the request and emits response upon completion
*/
public HttpPipelineCallContext newContext(HttpRequest httpRequest, Context data) {
return new HttpPipelineCallContext(httpRequest, data);
public Mono<HttpResponse> send(HttpRequest request) {
return this.send(new HttpPipelineCallContext(request));
}

/**
* Wraps the request in a context and send it through pipeline.
*
* Wraps the request in a context with additional metadata and sends it through the pipeline.
* @param request the request
* @param data additional metadata to pass along in the request
* @return a publisher upon subscription flows the context through policies, sends the request and emits response upon completion
*/
public Mono<HttpResponse> send(HttpRequest request) {
return this.send(this.newContext(request));
public Mono<HttpResponse> send(HttpRequest request, Context data) {
return this.send(new HttpPipelineCallContext(request, data));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http;

import com.azure.core.http.policy.HttpPipelinePolicy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* This class provides a fluent builder API to help aid the configuration and instantiation of the {@link HttpPipeline},
* calling {@link HttpPipelineBuilder#build() build} constructs an instance of the pipeline.
*
* <p>A pipeline is configured with a HttpClient that sends the request, if no client is set a default is used.
* A pipeline may be configured with a list of policies that are applied to each request.</p>
*
* <p><strong>Code Samples</strong></p>
*
* <p>Create a pipeline without configuration</p>
*
* <pre>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add header here about code samples, and some explanation text.

* HttpPipeline.builder()
* .build();
* </pre>
*
* <p>Create a pipeline using the default HTTP client and a retry policy</p>
*
* <pre>
* HttpPipeline.builder()
* .httpClient(HttpClient.createDefault())
* .policies(new RetryPolicy())
* .build();
* </pre>
*
* @see HttpPipeline
*/
public class HttpPipelineBuilder {
private HttpClient httpClient;
private List<HttpPipelinePolicy> pipelinePolicies;


HttpPipelineBuilder() {
}

/**
* Creates a {@link HttpPipeline} based on options set in the Builder. Every time {@code build()} is
* called, a new instance of {@link HttpPipeline} is created.
*
* If HttpClient is not set then the {@link HttpClient#createDefault() default HttpClient} is used.
*
* @return A HttpPipeline with the options set from the builder.
*/
public HttpPipeline build() {
List<HttpPipelinePolicy> policies = (pipelinePolicies == null) ? new ArrayList<>() : pipelinePolicies;
HttpClient client = (httpClient == null) ? HttpClient.createDefault() : httpClient;

return new HttpPipeline(client, policies);
}

/**
* Sets the HttpClient that the pipeline will use to send requests.
*
* @param httpClient The HttpClient the pipeline will use when sending requests.
* @return The updated HttpPipelineBuilder object.
*/
public HttpPipelineBuilder httpClient(HttpClient httpClient) {
this.httpClient = httpClient;
return this;
}

/**
* Adds {@link HttpPipelinePolicy policies} to the set of policies that the pipeline will use
* when sending requests.
*
* @param policies Policies to add to the policy set.
* @return The updated HttpPipelineBuilder object.
*/
public HttpPipelineBuilder policies(HttpPipelinePolicy... policies) {
if (pipelinePolicies == null) {
pipelinePolicies = new ArrayList<>();
}

this.pipelinePolicies.addAll(Arrays.asList(policies));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ public class HttpPipelineNextPolicy {
* @return a publisher upon subscription invokes next policy and emits response from the policy.
*/
public Mono<HttpResponse> process() {
final int size = this.pipeline.pipelinePolicies().length;
final int size = this.pipeline.getPolicyCount();
if (this.currentPolicyIndex > size) {
return Mono.error(new IllegalStateException("There is no more policies to execute."));
}

this.currentPolicyIndex++;
if (this.currentPolicyIndex == size) {
return this.pipeline.httpClient().send(this.context.httpRequest());
} else {
this.currentPolicyIndex++;
if (this.currentPolicyIndex == size) {
return this.pipeline.httpClient().send(this.context.httpRequest());
} else {
return this.pipeline.pipelinePolicies()[this.currentPolicyIndex].process(this.context, this);
}
return this.pipeline.getPolicy(this.currentPolicyIndex).process(this.context, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.azure.core.annotations.ResumeOperation;
import com.azure.core.credentials.ServiceClientCredentials;
import com.azure.core.exception.HttpRequestException;
import com.azure.core.util.Context;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
Expand All @@ -34,6 +33,7 @@
import com.azure.core.implementation.util.FluxUtil;
import com.azure.core.implementation.util.ImplUtils;
import com.azure.core.implementation.util.TypeUtil;
import com.azure.core.util.Context;
import io.netty.buffer.ByteBuf;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -112,7 +112,7 @@ public SerializerAdapter serializer() {
* @return a {@link Mono} that emits HttpResponse asynchronously
*/
public Mono<HttpResponse> send(HttpRequest request, Context contextData) {
return httpPipeline.send(httpPipeline.newContext(request, contextData));
return httpPipeline.send(request, contextData);
}

@Override
Expand Down Expand Up @@ -558,7 +558,10 @@ public static HttpPipeline createDefaultPipeline(HttpPipelinePolicy credentialsP
if (credentialsPolicy != null) {
policies.add(credentialsPolicy);
}
return new HttpPipeline(policies.toArray(new HttpPipelinePolicy[policies.size()]));

return HttpPipeline.builder()
.policies(policies.toArray(new HttpPipelinePolicy[0]))
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public void basicCredentialsTest() throws Exception {
return next.process();
};
//
final HttpPipeline pipeline = new HttpPipeline(new MockHttpClient(),
new CredentialsPolicy(credentials),
auditorPolicy);
final HttpPipeline pipeline = HttpPipeline.builder()
.httpClient(new MockHttpClient())
.policies(new CredentialsPolicy(credentials), auditorPolicy)
.build();


HttpRequest request = new HttpRequest(HttpMethod.GET, new URL("http://localhost"));
Expand All @@ -47,9 +48,10 @@ public void tokenCredentialsTest() throws Exception {
return next.process();
};

final HttpPipeline pipeline = new HttpPipeline(new MockHttpClient(),
new CredentialsPolicy(credentials),
auditorPolicy);
final HttpPipeline pipeline = HttpPipeline.builder()
.httpClient(new MockHttpClient())
.policies(new CredentialsPolicy(credentials), auditorPolicy)
.build();

HttpRequest request = new HttpRequest(HttpMethod.GET, new URL("http://localhost"));
pipeline.send(request).block();
Expand Down
Loading