|
3 | 3 | package com.azure.cosmos.implementation; |
4 | 4 |
|
5 | 5 | import com.azure.cosmos.ConsistencyLevel; |
6 | | -import com.azure.cosmos.CosmosContainerProactiveInitConfig; |
7 | | -import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; |
| 6 | +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequest; |
| 7 | +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs; |
8 | 8 | import com.azure.cosmos.implementation.http.HttpClient; |
9 | | -import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; |
10 | | -import com.azure.cosmos.models.CosmosContainerIdentity; |
| 9 | +import com.azure.cosmos.implementation.http.HttpHeaders; |
| 10 | +import com.azure.cosmos.implementation.http.HttpRequest; |
| 11 | +import io.netty.buffer.ByteBuf; |
| 12 | +import io.netty.buffer.Unpooled; |
| 13 | +import io.netty.handler.codec.http.HttpMethod; |
11 | 14 | import reactor.core.publisher.Flux; |
12 | 15 | import reactor.core.publisher.Mono; |
13 | 16 |
|
| 17 | +import java.net.URI; |
14 | 18 | import java.util.HashMap; |
15 | | -import java.util.List; |
16 | 19 | import java.util.Map; |
17 | 20 |
|
18 | 21 | import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; |
19 | 22 |
|
20 | 23 | /** |
21 | 24 | * While this class is public, but it is not part of our published public APIs. |
22 | | - * This is meant to be internally used only by our sdk. |
| 25 | + * This is meant to be internally used only by our sdk. |
23 | 26 | * |
24 | 27 | * Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service. |
25 | 28 | */ |
@@ -76,4 +79,53 @@ protected Map<String, String> getDefaultHeaders( |
76 | 79 |
|
77 | 80 | return defaultHeaders; |
78 | 81 | } |
| 82 | + |
| 83 | + @Override |
| 84 | + public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { |
| 85 | + |
| 86 | + // todo - neharao1 - validate b/w name() v/s toString() |
| 87 | + request.setThinclientHeaders(request.getOperationType().name(), request.getResourceType().name()); |
| 88 | + |
| 89 | + // todo - neharao1: no concept of a replica / service endpoint that can be passed |
| 90 | + RntbdRequestArgs rntbdRequestArgs = new RntbdRequestArgs(request); |
| 91 | + |
| 92 | + // todo - neharao1: validate what HTTP headers are needed - for now have put default ThinClient HTTP headers |
| 93 | + // todo - based on fabianm comment - thinClient also takes op type and resource type headers as HTTP headers |
| 94 | + HttpHeaders headers = this.getHttpHeaders(); |
| 95 | + |
| 96 | + RntbdRequest rntbdRequest = RntbdRequest.from(rntbdRequestArgs); |
| 97 | + |
| 98 | + // todo: neharao1 - validate whether Java heap buffer is okay v/s Direct buffer |
| 99 | + ByteBuf byteBuf = Unpooled.buffer(); |
| 100 | + |
| 101 | + // todo: comment can be removed - RntbdRequestEncoder does the same - a type of ChannelHandler in ChannelPipeline (a Netty concept) |
| 102 | + // todo: lifting the logic from there to encode the RntbdRequest instance into a ByteBuf (ByteBuf is a network compatible format) |
| 103 | + // todo: double-check with fabianm to see if RntbdRequest across RNTBD over TCP (Direct connectivity mode) is same as that when using ThinClient proxy |
| 104 | + rntbdRequest.encode(byteBuf); |
| 105 | + |
| 106 | + return new HttpRequest( |
| 107 | + // todo: HttpMethod when using ThinClient is presumably always an HttpMethod.POST - validate this |
| 108 | + HttpMethod.POST, |
| 109 | + requestUri, |
| 110 | + requestUri.getPort(), |
| 111 | + headers, |
| 112 | + Flux.just(byteBuf.array())); |
| 113 | + } |
| 114 | + |
| 115 | + // todo: neharao1 - validate if RxGatewayStoreModel#unwrapToStoreResponse can be reused |
| 116 | +// @Override |
| 117 | +// public StoreResponse unwrapToStoreResponse(RxDocumentServiceRequest request, int statusCode, HttpHeaders headers, ByteBuf content) { |
| 118 | +// return null; |
| 119 | +// } |
| 120 | + |
| 121 | + private HttpHeaders getHttpHeaders() { |
| 122 | + HttpHeaders httpHeaders = new HttpHeaders(); |
| 123 | + Map<String, String> defaultHeaders = this.getDefaultHeaders(); |
| 124 | + |
| 125 | + for (Map.Entry<String, String> header : defaultHeaders.entrySet()) { |
| 126 | + httpHeaders.set(header.getKey(), header.getValue()); |
| 127 | + } |
| 128 | + |
| 129 | + return httpHeaders; |
| 130 | + } |
79 | 131 | } |
0 commit comments