Skip to content

Conversation

@avantgardnerio
Copy link
Contributor

Which issue does this PR close?

Closes #92.

Rationale for this change

To use off the shelf tools with Ballista.

What changes are included in this PR?

A minimal FlightSQL service implementation.

Are there any user-facing changes?

They should be able to connect with off-the-shelf tools, given they have correctly installed and configured the FlightSQL JDBC or ODBC drivers.

@avantgardnerio
Copy link
Contributor Author

I was able to retrieve query results from some custom Kotlin JDBC client code I'll post shortly:

[Test worker] INFO FlightSqlStatement - Execute SQL: SELECT 'keep alive'
[Test worker] INFO FlightSqlStatement - Got response: keep alive

@avantgardnerio
Copy link
Contributor Author

Working via custom JDBC driver in DataGrip:

Screenshot from 2022-07-24 17-05-50

@avantgardnerio
Copy link
Contributor Author

The official JDBC driver needs prepared statement support in order to work:

Screenshot from 2022-07-24 17-33-56

> select 'Hello, Apache Arrow Ballista2!'
[2022-07-24 17:33:31] Error while executing SQL "select 'Hello, Apache Arrow Ballista2!'": Implement do_action_create_prepared_statement
[2022-07-24 17:33:31] UNIMPLEMENTED: Implement do_action_create_prepared_statement

Also, it is necessary to set settings to:

  1. no authentication
  2. useEncryption=false

Screenshot from 2022-07-24 17-35-21

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @avantgardnerio this looks like a great start.

@thinkharderdev @yahoNanJing any thoughts?

};
fieps.push(fiep);
}
break fieps;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems no need to use a label here for the break?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty neat. You can actually return a value from a loop, and that's what this is doing. I wasn't aware of it either, until this PR. https://doc.rust-lang.org/rust-by-example/flow_control/loop/return.html

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor nits but this looks awesome!

#[derive(Clone)]
pub struct FlightSqlServiceImpl {
server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
statements: Arc<Mutex<HashMap<Uuid, LogicalPlan>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to save the logical plan then we'd need to eventually persist this to the state backend since the scheduler may be distributed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like I can store things trivially in etcd (or whatever backing store) using server.state.session_manager.state?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort of. There is a bit more structure imposed by the StateBackendClient to try and avoid lock contention in the scheduler. So you have to define a new Keyspace enum for the new data to live in. The it's probably best to add methods in TaskManager to save/fetch the logical plans. That can wrap the serde logic and interaction with the state backend.

let mut fieps: Vec<_> = vec![];
for loc in completed.partition_location.iter() {
let fetch = if let Some(ref id) = loc.partition_id {
let fetch = protobuf::FetchPartition {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I built this directly using the prost generated structs, and it was very painful since all this lacked autocomplete in my IDE. It looked like it was also possible to create Ballista/DataFusion native versions of these structs then From or Into to convert to prost? @andygrove or anyone who knows, I'd love advice on how to do this better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these prost structs have a corresponding struct that is nicer to use (along with conversions from/to). For this case you can convert the prost struct into a ballista_core::serde::scheduler::PartitionLocation which is nicer to use (it will unwrap all the Option fields which protobuf insists on but are not really optional).

use uuid::Uuid;

pub struct FlightSqlServiceImpl {
server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloning the whole server here seems... wrong. Perhaps I should have an Arc<Mutex<>> to it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can probably get away with an Arc<SchedulerServer<>>. The only time you need a mut reference is when you init the server which should only happen once.

.server
.state
.session_manager
.create_session(&config)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably shouldn't create a session every time.

Copy link
Contributor Author

@avantgardnerio avantgardnerio Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at the available parameters (CommandStatementQuery and FlightDescriptor) and not finding anything that would work particularly well as a session-identifier. I assume there must be such a thing lower down at the GRPC level? Anyone know where I can find something like that? Perhaps it was an oversight to omit it from the FlightSqlService interface and a PR is required to arrow?

let mut num_rows = 0;
let mut num_bytes = 0;
let fieps = loop {
sleep(Duration::from_millis(100)).await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The busy wait is a bit cheesy, but it appears Arrow FlightSQL expects synchronous execution? At least we're in a coroutine context and this isn't blocking a thread, thanks to Tokio, but the polling has been moved out of the client and into the scheduler. I'm not sure how else to do it, just something to be aware of.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we may be able to just use a watch and let the state backend push status updates (instead of polling). You can wrap that in a JobCompletionFuture. Something like

struct JobCompletionFuture {
    job_completion_watch: Watch,
}

impl Future for JobCompletionFuture {
    type Output = Result<JobStatus, BallistaError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            match self.job_completion_watch.poll_next_unpin(cx) {
                Poll::Ready(Some(WatchEvent::Put(_key, value))) => {
                   if let Ok(job_status) = decode_protobuf::<JobStatus>(&value) {
                        Poll::Ready(Ok(job_status))
                    } else {
                       Poll::Ready(Err(BallistaError::Internal("some error".to_owned())))
                   }
                }
                Poll::Ready(None) => Poll::Ready(Err(BallistaError::Internal("watch ended with no update".to_owned()))),
                _ => Poll::Pending,
            }
    }
}

It would have to be slightly more complicated than this since we would need to watch both the CompletedJobs/<job id> and FailedJob/<job id> keys but the basic premise would still work I think.

num_rows += stats.num_rows;
num_bytes += stats.num_bytes;
} else {
Err(Status::internal("Error getting stats".to_string()))?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of untested branches here. I'm still trying to figure out how to test properly. If the Rust implementation of the Flight SQL client is far enough along, integration tests would be good. Otherwise (and perhaps in addition to), unit test would be helpful. But so much of this is interacting with other parts of the system, I'm not quite sure how to do it. I'll look over test_poll_work() as a reference.

SchedulerGrpcServer::new(scheduler_server.clone());

let flight_sql_server = FlightServiceServer::new(FlightSqlServiceImpl::new(
scheduler_server.clone(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, cloning the whole server seems wrong, but this PR isn't the first to do so. Is there something I'm missing here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this probably needs to be cleaned up. The other places where we are cloning the scheduler server can probably be refactored to just take an Arc<SchedulerState<>>. Cloning the EventLoop might be more problematic though.

@avantgardnerio
Copy link
Contributor Author

I think the remaining problems I'm having with prepared statements are actually do to the implementation of the JDBC driver, as described here: https://github.com/apache/arrow/pull/12830/files#r929303757

@avantgardnerio avantgardnerio marked this pull request as ready for review July 26, 2022 02:57
@avantgardnerio
Copy link
Contributor Author

I'm undrafting this because it doesn't break anything and works against rafael-telles/arrow#42 and now the code is a little cleaner. I'll probably keep improving it based on the feedback above, but that stuff could probably happen after merge.

@andygrove andygrove merged commit 2e67faa into apache:master Jul 27, 2022
@avantgardnerio avantgardnerio deleted the bg_flight_sql2 branch July 28, 2022 15:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ballista should support Arrow FlightSQL

4 participants