Skip to content

Arrow Flight Connector Template#24427

Merged
BryanCutler merged 1 commit intoprestodb:masterfrom
BryanCutler:arrow-connector-java-23032
Jan 29, 2025
Merged

Arrow Flight Connector Template#24427
BryanCutler merged 1 commit intoprestodb:masterfrom
BryanCutler:arrow-connector-java-23032

Conversation

@BryanCutler
Copy link
Contributor

@BryanCutler BryanCutler commented Jan 24, 2025

Description

This adds the Arrow Flight base module as a template to build connectors that use a Flight service to transfer data with Presto in Arrow format. A concrete Arrow Flight connector implementation will extend the BaseArrowFlightClientHandler that can connect to a Flight service that handles the required RPC calls from the client handler. An example connector implementation is provided in the unit testing.

Supersedes: #23032

RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0004-arrow-flight-connector.md

Motivation and Context

Adds the ability to easily create an Arrow Flight connector that can connect with any data source supported by the Flight service and transfer data efficiently using the Arrow format.

Impact

Adding new base-arrow-flight module.

Test Plan

Unit tests added with testing Arrow Flight connector implementation. These include extension of com.facebook.presto.tests.AbstractTestQueries that test general queries over Flight. Additional roundtrip testing of all data types supported by the ArrowBlockBuilder is done. An ArrowFlightQueryRunner is also provided that can be run standalone for manual query testing.

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Add Arrow Flight connector :pr:`24427`
* Add documentation for the :doc:`/connector/base-arrow-flight`  :pr:`24427`

@BryanCutler BryanCutler requested review from a team, elharo and steveburnett as code owners January 24, 2025 19:18
@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Jan 24, 2025
@prestodb-ci prestodb-ci requested review from a team, aditi-pandit and psnv03 and removed request for a team January 24, 2025 19:18
@BryanCutler BryanCutler mentioned this pull request Jan 24, 2025
6 tasks
@BryanCutler BryanCutler requested a review from tdcmeehan January 24, 2025 19:20
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-jdbc</artifactId>
<version>${dep.arrow.version}</version>
<scope>test</scope>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this to test dependency

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<scope>runtime</scope>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed as a runtime dependency

public Block buildBlockFromFieldVector(FieldVector vector, Type type, DictionaryProvider dictionaryProvider)
{
// Use Arrow dictionary to create a DictionaryBlock
if (dictionaryProvider != null && vector.getField().getDictionary() != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A DictionaryBlock is created only if a dictionary provider is supplied (by the FlightStream) and the Arrow Field contains a dictionary

// TODO: need method handles to construct MapType
throw new UnsupportedOperationException("MapType is currently unsupported");
}
case Struct: {
Copy link
Contributor Author

@BryanCutler BryanCutler Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Struct -> RowType

return BooleanType.BOOLEAN;
case Time:
return TimeType.TIME;
case List: {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added List type -> ArrayType

Field entryField = children.get(0);
checkArgument(entryField.getChildren().size() == 2, "Arrow Map entries expected to have 2 child Fields, got: " + children.size());
// TODO: need method handles to construct MapType
throw new UnsupportedOperationException("MapType is currently unsupported");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not implement the MapType because need to figure out how to define the MethodHandles. Any ideas for this @tdcmeehan ?

}
}

private void assignBlockFromValueVector(ValueVector vector, Type type, BlockBuilder builder, int startIndex, int endIndex)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified this to work with nested data types

}
}

public void assignBlockFromListVector(ListVector vector, Type type, BlockBuilder builder, int startIndex, int endIndex)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to extract values directly from the element vector

}
}

public void assignBlockFromStructVector(StructVector vector, Type type, BlockBuilder builder, int startIndex, int endIndex)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to extract values from all child vectors of the RowType

this.metadata = requireNonNull(metadata, "Metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.connectorAllocator = requireNonNull(connectorAllocator, "connectorAllocator is null");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to use a BufferAllocator instance. The connector will own this allocator and close it on shutdown. Flight clients created by the connector create a child allocator from this that is closed with the client.


ArrowErrorCode(int code, ErrorType type)
{
errorCode = new ErrorCode(code + 0x0510_0000, name(), type);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed error code range to be unique

public void configure(Binder binder)
{
configBinder(binder).bindConfig(ArrowFlightConfig.class);
binder.bind(BufferAllocator.class).to(RootAllocator.class).in(Scopes.SINGLETON);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this as the default RootAllocator instance, will be owned by the connector

this.columnHandles = requireNonNull(columnHandles, "columnHandles is null");
requireNonNull(clientHandler, "clientHandler is null");
this.arrowBlockBuilder = requireNonNull(arrowBlockBuilder, "arrowBlockBuilder is null");
this.flightStreamAndClient = clientHandler.getFlightStream(split, connectorSession);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up the stream usage and closing here

@Override
public Page getNextPage()
{
logger.debug("Reading next Arrow record batch");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some debug logs

implements ConnectorTableHandle
{
private final String schema;
private final String table;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was previously discussion about making these optional #23032 (review)

import static java.nio.file.Files.newInputStream;
import static java.util.Objects.requireNonNull;

public abstract class BaseArrowFlightClientHandler
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this to better indicate that it is a wrapper around the client and not an instance of the client, was confusing


public BaseArrowFlightClientHandler(BufferAllocator allocator, ArrowFlightConfig config)
{
this.allocator = requireNonNull(allocator, "allocator is null");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed allocator usage here, it no longer creates a root allocator for each client instance. Instead an allocator is required in the constructor

}
}

protected ClientClosingFlightStream getFlightStream(ArrowSplit split, ConnectorSession connectorSession)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the Ticket parameter. Since it is in the ArrowSplit already, we can just deserialize once

@BryanCutler BryanCutler force-pushed the arrow-connector-java-23032 branch from b530fd2 to 2d81e93 Compare January 25, 2025 01:02
public class TestingArrowFlightPlugin
extends ArrowPlugin
{
public TestingArrowFlightPlugin()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is part of the testing connector, should go into the testingConnector package


public class TestingH2DatabaseSetup
{
private static final Logger logger = Logger.get(TestingH2DatabaseSetup.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a part of testing connector, can be moved to outside package

public class TestingArrowProducer
implements FlightProducer
{
private final BufferAllocator allocator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A connector implementation does not need this right? can be moved to outside package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I grouped everything needed by the connector into this package, including the server stuff. It seems like it would be a little clearer to move all testing server related classes into it's own package testingServer. I'll update the docs to explain this too.

- ``ArrowBlockBuilder.java``
This class builds Presto blocks from Arrow vectors. Extend this class if needed and override ``getPrestoTypeFromArrowField`` method, if any customizations are needed for the conversion of Arrow vector to Presto type. A binding for this class should be created in the ``Module`` for the plugin.

A reference implementation of the presto-base-arrow-flight module in the test folder of this module uses a locally started Flight server to fetch data from an H2 database. The classes ``TestingArrowBlockBuilder``, ``TestingArrowFlightClientHandler``, ``TestingArrowFlightPlugin``, ``TestingArrowFlightRequest``, ``TestingArrowFlightResponse``, ``TestingArrowModule`` and ``TestingArrowQueryBuilder`` make up the reference implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc can refer to the testingConnector package to indicate the reference connector implementation

public void close()
{
if (flightStreamAndClient.hasRoot()) {
completed = true;
Copy link
Contributor

@elbinpallimalilibm elbinpallimalilibm Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set completed to true only when flight stream has root?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it was before, but looking into it further, it's not needed here. when flightStreamAndClient.next() returns false, that will set the completed flag, indicating that no more pages will be produced. I'll take this out.

@tdcmeehan tdcmeehan self-assigned this Jan 27, 2025
@BryanCutler BryanCutler force-pushed the arrow-connector-java-23032 branch 5 times, most recently from 59e58f4 to 9986850 Compare January 28, 2025 03:43
tdcmeehan
tdcmeehan previously approved these changes Jan 28, 2025
public ArrowSplit(
@JsonProperty("schemaName") @Nullable String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("flightEndpoint") byte[] flightEndpoint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call it flightEndpointBytes to indicate it's a serialized representation of the endpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sounds better

}

logger.info("Executing query: " + query);
query = query.toUpperCase(); // Optionally, to maintain consistency
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't appear to mean anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure. I'll remove the comment and keep the uppercase conversion, in case that is preferred by the h2 driver.

This adds the Arrow Flight base module as a template to build connectors
that use a Flight service to transfer data with Presto in Arrow format.
A concrete Arrow Flight connector implementation will extend the
BaseArrowFlightClientHandler that can connect to a Flight service that
handles the required RPC calls from the client handler. An example
connector implementation is provided in the unit testing.

RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0004-arrow-flight-connector.md

Co-authored-by: sai bhaskar reddy <sai.bhaskar.reddy.sabbasani1@ibm.com>
Co-authored-by: SthuthiGhosh9400 <Sthuthi.Ghosh@ibm.com>
Co-authored-by: lithinwxd <Lithin.Purushothaman@ibm.com>
Co-authored-by: Steve Burnett <burnett@pobox.com>
Co-authored-by: elbinpallimalilibm <elbin.pallimalil@ibm.com>
Co-authored-by: Tim Meehan <tim@timdmeehan.com>
@BryanCutler BryanCutler merged commit 4464129 into prestodb:master Jan 29, 2025
55 checks passed
@BryanCutler BryanCutler deleted the arrow-connector-java-23032 branch January 29, 2025 22:05
shangm2 pushed a commit to shangm2/presto that referenced this pull request Jan 30, 2025
This adds the Arrow Flight base module as a template to build connectors
that use a Flight service to transfer data with Presto in Arrow format.
A concrete Arrow Flight connector implementation will extend the
BaseArrowFlightClientHandler that can connect to a Flight service that
handles the required RPC calls from the client handler. An example
connector implementation is provided in the unit testing.

RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0004-arrow-flight-connector.md

Co-authored-by: sai bhaskar reddy <sai.bhaskar.reddy.sabbasani1@ibm.com>
Co-authored-by: SthuthiGhosh9400 <Sthuthi.Ghosh@ibm.com>
Co-authored-by: lithinwxd <Lithin.Purushothaman@ibm.com>
Co-authored-by: Steve Burnett <burnett@pobox.com>
Co-authored-by: elbinpallimalilibm <elbin.pallimalil@ibm.com>
Co-authored-by: Tim Meehan <tim@timdmeehan.com>
jp-sivaprasad pushed a commit to jp-sivaprasad/presto that referenced this pull request Mar 10, 2025
This adds the Arrow Flight base module as a template to build connectors
that use a Flight service to transfer data with Presto in Arrow format.
A concrete Arrow Flight connector implementation will extend the
BaseArrowFlightClientHandler that can connect to a Flight service that
handles the required RPC calls from the client handler. An example
connector implementation is provided in the unit testing.

RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0004-arrow-flight-connector.md

Co-authored-by: sai bhaskar reddy <sai.bhaskar.reddy.sabbasani1@ibm.com>
Co-authored-by: SthuthiGhosh9400 <Sthuthi.Ghosh@ibm.com>
Co-authored-by: lithinwxd <Lithin.Purushothaman@ibm.com>
Co-authored-by: Steve Burnett <burnett@pobox.com>
Co-authored-by: elbinpallimalilibm <elbin.pallimalil@ibm.com>
Co-authored-by: Tim Meehan <tim@timdmeehan.com>
@prestodb-ci prestodb-ci mentioned this pull request Mar 28, 2025
30 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:IBM PR from IBM

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants