-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Stream Transport implementation using Arrow Flight #18424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream Transport implementation using Arrow Flight #18424
Conversation
|
❌ Gradle check result for 9a8ac93: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 6c0eb1f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 4927302: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 8818f6e: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
3ede3a2 to
fa7a38b
Compare
fa7a38b to
4461c75
Compare
|
❌ Gradle check result for 4461c75: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 4461c75: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 2233764: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
c256b09 to
f26c9f8
Compare
|
❌ Gradle check result for f26c9f8: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for f0d0c75: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 46afedd: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for f09dc7f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
3254ed5 to
70af3e2
Compare
|
❌ Gradle check result for 70af3e2: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
70af3e2 to
a247bbf
Compare
|
❌ Gradle check result for a247bbf: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
a247bbf to
237d39e
Compare
Signed-off-by: Rishabh Maurya <[email protected]>
|
❌ Gradle check result for 275ad4d: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❕ Gradle check result for 275ad4d: UNSTABLE Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure. |
|
❌ Gradle check result for 9b1414e: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
server/src/main/java/org/opensearch/transport/nativeprotocol/NativeOutboundHandler.java
Outdated
Show resolved
Hide resolved
...ns/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/stats/FlightStatsResponse.java
Show resolved
Hide resolved
Signed-off-by: Rishabh Maurya <[email protected]>
a841010 to
687ec2d
Compare
|
❌ Gradle check result for 687ec2d: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 687ec2d: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
…t#18424) * vectorized version of StreamInput and StreamOutput Signed-off-by: Rishabh Maurya <[email protected]> * Fix for the fetch phase optimization Signed-off-by: Rishabh Maurya <[email protected]> * Fix issues at flight transport layer; Add middleware for header management Signed-off-by: Rishabh Maurya <[email protected]> * Fix race condition with header in flight transport Signed-off-by: Rishabh Maurya <[email protected]> * Refactor; gradle check fixes Signed-off-by: Rishabh Maurya <[email protected]> * Add stats API Signed-off-by: Rishabh Maurya <[email protected]> * Stats API refactor; Cancellation of stream through StreamTransportResponse Signed-off-by: Rishabh Maurya <[email protected]> * Added base test class for stream transport and tests for FlightClientChannel Signed-off-by: Rishabh Maurya <[email protected]> * Fix tests due to null stream transport passed to StubbableTransport Signed-off-by: Rishabh Maurya <[email protected]> * Fix the failing tests due to connection profile missing STREAM type Signed-off-by: Rishabh Maurya <[email protected]> * cancellation and timeout fixes; fixes for resource cleanup; more tests; documentation update Signed-off-by: Rishabh Maurya <[email protected]> * Increase latch await time for early cancellation test to fix flakiness Signed-off-by: Rishabh Maurya <[email protected]> * improve javadocs; code refactor Signed-off-by: Rishabh Maurya <[email protected]> * fix issues in flight client channel; added docs on usage; standardize the exceptions Signed-off-by: Rishabh Maurya <[email protected]> * pass along request Id from OutboundHandler to TcpChannel; refactor FlightTransportResponse for header management; more tests; update docs Signed-off-by: Rishabh Maurya <[email protected]> * code coverage Signed-off-by: Rishabh Maurya <[email protected]> * API changes for stream transport * extensibility for transport classes * StreamTransport and StreamTransportService implementation * streaming based search action Signed-off-by: Rishabh Maurya <[email protected]> * update docs Signed-off-by: Rishabh Maurya <[email protected]> * Standardize error handling Signed-off-by: Rishabh Maurya <[email protected]> * stream transport metrics and integration Signed-off-by: Rishabh Maurya <[email protected]> * unit tests for metrics Signed-off-by: Rishabh Maurya <[email protected]> * Fixes related to security and FGAC Signed-off-by: Rishabh Maurya <[email protected]> * Chaos IT and fixes on resource leaks like reader context cleanup after search Signed-off-by: Rishabh Maurya <[email protected]> * register stream default timeout setting Signed-off-by: Rishabh Maurya <[email protected]> * test stability and latch timeout settings Signed-off-by: Rishabh Maurya <[email protected]> * pr comment: nitpick Signed-off-by: Rishabh Maurya <[email protected]> * aggregation ser/de changes not required anymore Signed-off-by: Rishabh Maurya <[email protected]> * Add changelog Signed-off-by: Rishabh Maurya <[email protected]> * Allow flight server to bind to multiple addresses Signed-off-by: Rishabh Maurya <[email protected]> * example plugin to demonstrate defining stream based transport action Signed-off-by: Rishabh Maurya <[email protected]> * support for slow logs, remove unnecessary thread switch to flight client Signed-off-by: Rishabh Maurya <[email protected]> * Make FlightServerChannel threadsafe Signed-off-by: Rishabh Maurya <[email protected]> * Allocator related tuning Signed-off-by: Rishabh Maurya <[email protected]> * Attempt to fix flaky metric test Signed-off-by: Rishabh Maurya <[email protected]> * Improve test coverage Signed-off-by: Rishabh Maurya <[email protected]> * fix documentation Signed-off-by: Rishabh Maurya <[email protected]> * Add @experimentalapi annotation Signed-off-by: Rishabh Maurya <[email protected]> * Share TaskManager and remoteClientService between stream and regular transport service Signed-off-by: Rishabh Maurya <[email protected]> * fix tests Signed-off-by: Rishabh Maurya <[email protected]> * address pr comment Signed-off-by: Rishabh Maurya <[email protected]> * fix test Signed-off-by: Rishabh Maurya <[email protected]> * Update documentation Signed-off-by: Rishabh Maurya <[email protected]> * Fix synchronization with multiple batches written concurrently at server Signed-off-by: Rishabh Maurya <[email protected]> * Address PR comment Signed-off-by: Rishabh Maurya <[email protected]> --------- Signed-off-by: Rishabh Maurya <[email protected]>
Description
Stream transport (#18722 | RFC: #18425) implementation using Arrow Flight RPC. This is in continuation to #18722 and currently contains all its diff as well.
Key features -
Testing
The whole transport is behind a feature flag, so no impact when plugin is not installed or feature flag is disabled (default).
Tests pending -
I have found the whole transport layer in OpenSearch to be quite complicated and this is a giant change. Thus, I'm sharing my cheat sheet for the reviewers (or anyone interested in OpenSearch's transport) to understand the existing flow and how the flow is modified to support stream transport using Flight in this PR -
sequenceDiagram participant Client participant TS as TransportService participant CM as ConnectionManager participant C as Connection participant TC as TcpChannel<br/>(Netty4TcpChannel) participant NOH as NativeOutboundHandler participant N as Network Note over Client,N: Netty4 Flow Client->>TS: Send TransportRequest TS->>TS: Generate reqID TS->>CM: Get Connection CM->>C: Provide Connection C->>TC: Use Channel TC->>NOH: Serialize to BytesReference<br/>(StreamOutput) with reqID NOH->>N: Send BytesReference participant Client2 participant STS as StreamTransportService participant CM2 as ConnectionManager participant C2 as Connection participant FTC as FlightTcpChannel participant FMH as FlightMessageHandler participant FC as FlightClientChannel participant N2 as Network Note over Client2,N2: Flight Flow Client2->>STS: Send TransportRequest STS->>STS: Generate reqID STS->>CM2: Get Connection CM2->>C2: Provide Connection C2->>FTC: Use Channel FTC->>FMH: Serialize to Flight Ticket<br/>(ArrowStreamOutput) with reqID FMH->>FC: Send Flight Ticket FC->>N2: Transmit Request2. Inbound Server: Netty4 vs. Flight
sequenceDiagram participant STC as Server TcpChannel<br/>(Netty4TcpChannel) participant IP as InboundPipeline participant IH as InboundHandler participant NMH as NativeMessageHandler participant RH as RequestHandler Note over STC,RH: Netty4 Flow STC->>IP: Receive BytesReference IP->>IH: Deserialize to InboundMessage<br/>(StreamInput) IH->>NMH: Interpret as TransportRequest NMH->>RH: Process Request participant FS as FlightServer participant FP as FlightProducer participant IP2 as InboundPipeline participant IH2 as InboundHandler participant NMH2 as NativeMessageHandler participant RH2 as RequestHandler Note over FS,RH2: Flight Flow FS->>FP: Receive Flight Ticket FP->>FP: Create VectorSchemaRoot FP->>FP: Create FlightServerChannel FP->>IP2: Pass to InboundPipeline IP2->>IH2: Deserialize with ArrowStreamInput IH2->>NMH2: Interpret as TransportRequest NMH2->>RH2: Process Request3. Outbound Server: Netty4 vs. Flight
sequenceDiagram participant RH as RequestHandler participant OH as OutboundHandler participant TTC as TcpTransportChannel participant TC as TcpChannel Note over RH,TC: Netty4 Flow RH->>TTC: sendResponse(TransportResponse) TTC->>OH: Serialize TransportResponse<br/>(via sendResponse) OH->>TC: Send Serialized Data to Client participant RH2 as RequestHandler participant FTC as FlightTransportChannel participant FOH as FlightOutboundHandler participant FSC as FlightServerChannel participant SSL as ServerStreamListener Note over RH2,SSL: Flight Flow RH2->>FTC: sendResponseBatch(TransportResponse) FTC->>FOH: sendResponseBatch FOH->>FSC: sendBatch(VectorSchemaRoot) FSC->>SSL: start(root) (first batch) FSC->>SSL: putNext() (stream batch) RH2->>FTC: completeStream() FTC->>FOH: completeStream FOH->>FSC: completeStream FSC->>SSL: completed() (end stream)4. Inbound Client: Netty4 vs. Flight
sequenceDiagram participant CTC as Client TcpChannel<br/>(Netty4TcpChannel) participant CIP as Client InboundPipeline participant CIH as Client InboundHandler participant RH as ResponseHandler Note over CTC,RH: Netty4 Flow CTC->>CIP: Receive BytesReference CIP->>CIH: Deserialize to TransportResponse<br/>(StreamInput) CIH->>RH: Deliver Response participant FC as FlightClient participant FCC as FlightClientChannel participant FTR as FlightTransportResponse participant RH2 as ResponseHandler Note over FC,RH2: Flight Flow (Async Response Handling) FC->>FCC: handleInboundStream(Ticket, Listener) FCC->>FTR: Create FlightTransportResponse FCC->>FCC: Retrieve Header and reqID FCC->>RH2: Get TransportResponseHandler<br/>using reqID FCC->>RH2: handler.handleStreamResponse(streamResponse)<br/>(Async Processing)Related Issues
Resolves #[Issue number to be closed when this PR is merged]
#18425
#17695
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.