Skip to content

Commit 687ec2d

Browse files
committed
Address PR comment
Signed-off-by: Rishabh Maurya <[email protected]>
1 parent 9b1414e commit 687ec2d

File tree

7 files changed

+37
-38
lines changed

7 files changed

+37
-38
lines changed

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ClientHeaderMiddleware.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
*/
3535
class ClientHeaderMiddleware implements FlightClientMiddleware {
3636
static final String RAW_HEADER_KEY = "raw-header";
37-
static final String REQUEST_ID_KEY = "req-id";
37+
static final String CORRELATION_ID_KEY = "correlation-id";
3838

3939
private final HeaderContext context;
4040
private final Version version;
@@ -60,13 +60,13 @@ class ClientHeaderMiddleware implements FlightClientMiddleware {
6060
@Override
6161
public void onHeadersReceived(CallHeaders incomingHeaders) {
6262
String encodedHeader = incomingHeaders.get(RAW_HEADER_KEY);
63-
String reqId = incomingHeaders.get(REQUEST_ID_KEY);
63+
String correlationId = incomingHeaders.get(CORRELATION_ID_KEY);
6464

6565
if (encodedHeader == null) {
6666
throw new StreamException(StreamErrorCode.INVALID_ARGUMENT, "Missing required header: " + RAW_HEADER_KEY);
6767
}
68-
if (reqId == null) {
69-
throw new StreamException(StreamErrorCode.INVALID_ARGUMENT, "Missing required header: " + REQUEST_ID_KEY);
68+
if (correlationId == null) {
69+
throw new StreamException(StreamErrorCode.INVALID_ARGUMENT, "Missing required header: " + CORRELATION_ID_KEY);
7070
}
7171

7272
try {
@@ -87,12 +87,11 @@ public void onHeadersReceived(CallHeaders incomingHeaders) {
8787
}
8888

8989
// Store the header in context for later retrieval
90-
long requestId = Long.parseLong(reqId);
91-
context.setHeader(requestId, header);
90+
context.setHeader(Long.parseLong(correlationId), header);
9291
} catch (IOException e) {
9392
throw new StreamException(StreamErrorCode.INTERNAL, "Failed to decode header", e);
9493
} catch (NumberFormatException e) {
95-
throw new StreamException(StreamErrorCode.INVALID_ARGUMENT, "Invalid request ID format: " + reqId, e);
94+
throw new StreamException(StreamErrorCode.INVALID_ARGUMENT, "Invalid request ID format: " + correlationId, e);
9695
}
9796
}
9897

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*/
4848
class FlightClientChannel implements TcpChannel {
4949
private static final Logger logger = LogManager.getLogger(FlightClientChannel.class);
50-
private final AtomicLong requestIdGenerator = new AtomicLong();
50+
private final AtomicLong correlationIdGenerator = new AtomicLong();
5151
private final FlightClient client;
5252
private final DiscoveryNode node;
5353
private final BoundTransportAddress boundAddress;
@@ -199,7 +199,7 @@ public InetSocketAddress getRemoteAddress() {
199199
}
200200

201201
@Override
202-
public void sendMessage(long reqId, BytesReference reference, ActionListener<Void> listener) {
202+
public void sendMessage(long requestId, BytesReference reference, ActionListener<Void> listener) {
203203
if (!isOpen()) {
204204
listener.onFailure(new StreamException(StreamErrorCode.UNAVAILABLE, "FlightClientChannel is closed"));
205205
return;
@@ -214,8 +214,8 @@ public void sendMessage(long reqId, BytesReference reference, ActionListener<Voi
214214
try {
215215
// ticket will contain the serialized headers
216216
Ticket ticket = serializeToTicket(reference);
217-
TransportResponseHandler<?> handler = responseHandlers.onResponseReceived(reqId, messageListener);
218-
long correlationId = requestIdGenerator.incrementAndGet();
217+
TransportResponseHandler<?> handler = responseHandlers.onResponseReceived(requestId, messageListener);
218+
long correlationId = correlationIdGenerator.incrementAndGet();
219219

220220
if (callTracker != null) {
221221
handler = new MetricsTrackingResponseHandler<>(handler, callTracker);
@@ -243,7 +243,7 @@ public void sendMessage(long reqId, BytesReference reference, ActionListener<Voi
243243

244244
@Override
245245
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
246-
throw new IllegalStateException("sendMessage must be accompanied with reqId for FlightClientChannel, use the right variant.");
246+
throw new IllegalStateException("sendMessage must be accompanied with requestId for FlightClientChannel, use the right variant.");
247247
}
248248

249249
private void processStreamResponse(FlightTransportResponse<?> streamResponse) {

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.io.IOException;
2929
import java.util.Objects;
3030

31-
import static org.opensearch.arrow.flight.transport.ClientHeaderMiddleware.REQUEST_ID_KEY;
31+
import static org.opensearch.arrow.flight.transport.ClientHeaderMiddleware.CORRELATION_ID_KEY;
3232

3333
/**
3434
* Arrow Flight implementation of streaming transport responses.
@@ -42,7 +42,7 @@ class FlightTransportResponse<T extends TransportResponse> implements StreamTran
4242
private final FlightStream flightStream;
4343
private final NamedWriteableRegistry namedWriteableRegistry;
4444
private final HeaderContext headerContext;
45-
private final long reqId;
45+
private final long correlationId;
4646
private final FlightTransportConfig config;
4747

4848
private final TransportResponseHandler<T> handler;
@@ -62,21 +62,21 @@ class FlightTransportResponse<T extends TransportResponse> implements StreamTran
6262
*/
6363
public FlightTransportResponse(
6464
TransportResponseHandler<T> handler,
65-
long reqId,
65+
long correlationId,
6666
FlightClient flightClient,
6767
HeaderContext headerContext,
6868
Ticket ticket,
6969
NamedWriteableRegistry namedWriteableRegistry,
7070
FlightTransportConfig config
7171
) {
7272
this.handler = handler;
73-
this.reqId = reqId;
73+
this.correlationId = correlationId;
7474
this.headerContext = Objects.requireNonNull(headerContext, "headerContext must not be null");
7575
this.namedWriteableRegistry = namedWriteableRegistry;
7676
this.config = config;
77-
// Initialize Flight stream with request ID header
77+
// Initialize Flight stream with correlation ID header
7878
FlightCallHeaders callHeaders = new FlightCallHeaders();
79-
callHeaders.insert(REQUEST_ID_KEY, String.valueOf(reqId));
79+
callHeaders.insert(CORRELATION_ID_KEY, String.valueOf(correlationId));
8080
HeaderCallOption callOptions = new HeaderCallOption(callHeaders);
8181
this.flightStream = flightClient.getStream(ticket, callOptions);
8282

@@ -118,7 +118,7 @@ public T nextResponse() {
118118

119119
if (flightStream.next()) {
120120
currentRoot = flightStream.getRoot();
121-
currentHeader = headerContext.getHeader(reqId);
121+
currentHeader = headerContext.getHeader(correlationId);
122122
// Capture the batch size before deserialization
123123
currentBatchSize = FlightUtils.calculateVectorSchemaRootSize(currentRoot);
124124
return deserializeResponse();
@@ -202,7 +202,7 @@ private synchronized void initializeStreamIfNeeded() {
202202
try {
203203
if (flightStream.next()) {
204204
currentRoot = flightStream.getRoot();
205-
currentHeader = headerContext.getHeader(reqId);
205+
currentHeader = headerContext.getHeader(correlationId);
206206
// Capture the batch size before deserialization
207207
currentBatchSize = FlightUtils.calculateVectorSchemaRootSize(currentRoot);
208208
streamInitialized = true;
@@ -212,13 +212,13 @@ private synchronized void initializeStreamIfNeeded() {
212212
} catch (FlightRuntimeException e) {
213213
// TODO maybe add a check - handshake and validate if node is connected
214214
// Try to get headers even if stream failed
215-
currentHeader = headerContext.getHeader(reqId);
215+
currentHeader = headerContext.getHeader(correlationId);
216216
streamExhausted = true;
217217
initializationException = FlightErrorMapper.fromFlightException(e);
218218
logger.warn("Stream initialization failed", e);
219219
} catch (Exception e) {
220220
// Try to get headers even if stream failed
221-
currentHeader = headerContext.getHeader(reqId);
221+
currentHeader = headerContext.getHeader(correlationId);
222222
streamExhausted = true;
223223
initializationException = new StreamException(StreamErrorCode.INTERNAL, "Stream initialization failed", e);
224224
logger.warn("Stream initialization failed", e);

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/HeaderContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
class HeaderContext {
1616
private final ConcurrentHashMap<Long, Header> headerMap = new ConcurrentHashMap<>();
1717

18-
void setHeader(long reqId, Header header) {
19-
headerMap.put(reqId, header);
18+
void setHeader(long correlationId, Header header) {
19+
headerMap.put(correlationId, header);
2020
}
2121

22-
Header getHeader(long reqId) {
23-
return headerMap.remove(reqId);
22+
Header getHeader(long correlationId) {
23+
return headerMap.remove(correlationId);
2424
}
2525
}

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ServerHeaderMiddleware.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import java.nio.ByteBuffer;
1818
import java.util.Base64;
1919

20+
import static org.opensearch.arrow.flight.transport.ClientHeaderMiddleware.CORRELATION_ID_KEY;
2021
import static org.opensearch.arrow.flight.transport.ClientHeaderMiddleware.RAW_HEADER_KEY;
21-
import static org.opensearch.arrow.flight.transport.ClientHeaderMiddleware.REQUEST_ID_KEY;
2222

2323
/**
2424
* ServerHeaderMiddleware is created per call to handle the response header
@@ -27,10 +27,10 @@
2727
*/
2828
class ServerHeaderMiddleware implements FlightServerMiddleware {
2929
private ByteBuffer headerBuffer;
30-
private final String reqId;
30+
private final String requestId;
3131

32-
ServerHeaderMiddleware(String reqId) {
33-
this.reqId = reqId;
32+
ServerHeaderMiddleware(String requestId) {
33+
this.requestId = requestId;
3434
}
3535

3636
void setHeader(ByteBuffer headerBuffer) {
@@ -44,11 +44,11 @@ public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
4444
headerBuffer.get(headerBytes);
4545
String encodedHeader = Base64.getEncoder().encodeToString(headerBytes);
4646
outgoingHeaders.insert(RAW_HEADER_KEY, encodedHeader);
47-
outgoingHeaders.insert(REQUEST_ID_KEY, reqId);
47+
outgoingHeaders.insert(CORRELATION_ID_KEY, requestId);
4848
headerBuffer.rewind();
4949
} else {
5050
outgoingHeaders.insert(RAW_HEADER_KEY, "");
51-
outgoingHeaders.insert(REQUEST_ID_KEY, reqId);
51+
outgoingHeaders.insert(CORRELATION_ID_KEY, requestId);
5252
}
5353
}
5454

@@ -61,8 +61,8 @@ public void onCallErrored(Throwable err) {}
6161
public static class Factory implements FlightServerMiddleware.Factory<ServerHeaderMiddleware> {
6262
@Override
6363
public ServerHeaderMiddleware onCallStarted(CallInfo callInfo, CallHeaders incomingHeaders, RequestContext context) {
64-
String reqId = incomingHeaders.get(REQUEST_ID_KEY);
65-
return new ServerHeaderMiddleware(reqId);
64+
String requestId = incomingHeaders.get(CORRELATION_ID_KEY);
65+
return new ServerHeaderMiddleware(requestId);
6666
}
6767
}
6868
}

server/src/main/java/org/opensearch/transport/TcpChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,11 @@ public interface TcpChannel extends CloseableChannel {
8888
* Sends a tcp message to the channel. The listener will be executed once the send process has been
8989
* completed.
9090
*
91-
* @param reqId request Id
91+
* @param requestId request Id
9292
* @param reference to send to channel
9393
* @param listener to execute upon send completion
9494
*/
95-
default void sendMessage(long reqId, BytesReference reference, ActionListener<Void> listener) {
95+
default void sendMessage(long requestId, BytesReference reference, ActionListener<Void> listener) {
9696
sendMessage(reference, listener);
9797
}
9898

server/src/main/java/org/opensearch/transport/nativeprotocol/NativeOutboundHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,11 @@ public void sendErrorResponse(
180180
sendMessage(requestId, channel, message, listener);
181181
}
182182

183-
private void sendMessage(long reqId, TcpChannel channel, NativeOutboundMessage networkMessage, ActionListener<Void> listener)
183+
private void sendMessage(long requestId, TcpChannel channel, NativeOutboundMessage networkMessage, ActionListener<Void> listener)
184184
throws IOException {
185185
MessageSerializer serializer = new MessageSerializer(networkMessage, bigArrays);
186186
OutboundHandler.SendContext sendContext = new OutboundHandler.SendContext(statsTracker, channel, serializer, listener, serializer);
187-
handler.sendBytes(reqId, channel, sendContext);
187+
handler.sendBytes(requestId, channel, sendContext);
188188
}
189189

190190
@Override

0 commit comments

Comments
 (0)