-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added user-friendly API to execute SQL statements: for row in w.statement_execution.iterate_rows(warehouse_id, 'SELECT * FROM samples.nyctaxi.trips LIMIT 10'): print(row.as_dict())
#295
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportPatch coverage is
📢 Thoughts on this report? Let us know!. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple questions but overall this looks great! Thanks for contributing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Are you planning to support this in all SDKs?
```python for (pickup_zip, dropoff_zip) in w.statement_execution.execute_fetch_all( warehouse_id, 'SELECT pickup_zip, dropoff_zip FROM nyctaxi.trips LIMIT 10', catalog='samples'): print(f'pickup_zip={pickup_zip}, dropoff_zip={dropoff_zip}') ```
df6d924
to
2bf281d
Compare
for row in w.statement_execution.iterate_rows(warehouse_id, 'SELECT * FROM samples.nyctaxi.trips LIMIT 10'): print(row.as_dict())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff, left a few comments
@staticmethod | ||
def _parse_timestamp(value: str) -> datetime.datetime: | ||
# make it work with Python 3.7 to 3.10 as well | ||
return datetime.datetime.fromisoformat(value.replace('Z', '+00:00')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would break when timezone is not UTC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sander-goos we always return in UTC, afaik
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it will use the current timezone from the spark session. By default this is UTC but it can be changed by a SQL admin.
warehouse_id: str, | ||
statement: str, | ||
*, | ||
byte_limit: Optional[int] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add row_limit as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would also be great if we could add parameters https://docs.databricks.com/api/workspace/statementexecution/executestatement
But we can do it in a follow up as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sander-goos not in this PR, feel free to add a follow-up PR with these additions.
result_data = self.get_statement_result_chunk_n(execute_response.statement_id, | ||
external_link.next_chunk_index) | ||
|
||
def _iterate_inline_disposition(self, execute_response: ExecuteStatementResponse) -> Iterator[Row]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this if it's not used?
cancel_execution.assert_called_with('bcd') | ||
|
||
|
||
def test_fetch_all_no_chunks(config, mocker): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: test_fetch_all_single_chunk
raise DatabricksError(message, error_code=error_code) | ||
raise DatabricksError(status.state.value) | ||
|
||
def execute(self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can name this: execute_sync
or execute_and_wait
to make clear this is waiting for the query to finish up to some specified timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_and_wait
has slightly different semantics in the SDK, as those methods return a future-like object
msg = f"timed out after {timeout}: {status_message}" | ||
raise TimeoutError(msg) | ||
|
||
def iterate_rows(self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think iterate_rows
doesn't fully cover what this method is doing.. As this is handling execution and result fetching, what about: execute_and_fetch
?
assert isinstance(rows[0].since, datetime.date) | ||
assert isinstance(rows[0].now, datetime.datetime) | ||
|
||
http_get.assert_called_with('https://singed-url') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert that we never submit the authorization token when fetching any external link?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
# ensure that we close the HTTP session after fetching the external links | ||
result_data = execute_response.result | ||
row_factory, col_conv = self._result_schema(execute_response) | ||
with self._api._new_session() as http: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this make sure we don't use the authorization header that's used for the internal requests? That is important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sander-goos self._api._new_session()
starts HTTP session without our auth headers, because pre-signed URLs don't expect any Authorization headers.
all other requests follow the normal flow.
That would be really nice way to use in the Airflow provider because right now dependencies are too heavyweight. |
Initial port from Databricks Python SDK PR: databricks/databricks-sdk-py#295
Merge queue setting changed
Execute SQL statements in a stateless manner.
Primary use-case of :py:meth:
iterate_rows
and :py:meth:execute
methods is oriented at executing SQL queries ina stateless manner straight away from Databricks SDK for Python, without requiring any external dependencies.
Results are fetched in JSON format through presigned external links. This is perfect for serverless applications
like AWS Lambda, Azure Functions, or any other containerised short-lived applications, where container startup
time is faster with the smaller dependency set.
Method :py:meth:
iterate_rows
returns an iterator of objects, that resemble :class:pyspark.sql.Row
APIs, but fullcompatibility is not the goal of this implementation.
When you only need to execute the query and have no need to iterate over results, use the :py:meth:
execute
.Applications, that need to a more traditional SQL Python APIs with cursors, efficient data transfer of hundreds of
megabytes or gigabytes of data serialized in Apache Arrow format, and low result fetching latency, should use
the stateful Databricks SQL Connector for Python.
New Integration tests