-
Notifications
You must be signed in to change notification settings - Fork 128
SI: Implement put operations #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
28c3a59
1b245b1
1239def
b605cce
3ed84d8
57b8a34
7812278
6b76439
3df7c89
8f0a02e
55525cb
157ac3d
0739ccc
d3a3651
72f917e
fba64b7
c27a3d6
19ca706
0167bd9
85e4d7c
713002d
fc06ef8
cafa17d
a508a1c
ce80df0
36885a4
c0c09d4
f612795
e609ef3
cdbe2d6
34a0362
c8a64c7
d48d3f3
e0037e0
3fa5d84
4824b68
469f35f
bdb948a
0261b7a
00d8a49
7a602e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,9 @@ | |
|
|
||
| import pandas | ||
| import pyarrow | ||
| import requests | ||
| import json | ||
| import os | ||
|
|
||
| from databricks.sql import __version__ | ||
| from databricks.sql import * | ||
|
|
@@ -28,7 +31,7 @@ def __init__( | |
| session_configuration: Dict[str, Any] = None, | ||
| catalog: Optional[str] = None, | ||
| schema: Optional[str] = None, | ||
| **kwargs | ||
| **kwargs, | ||
| ) -> None: | ||
| """ | ||
| Connect to a Databricks SQL endpoint or a Databricks cluster. | ||
|
|
@@ -173,7 +176,7 @@ def read(self) -> Optional[OAuthToken]: | |
| http_path, | ||
| (http_headers or []) + base_headers, | ||
| auth_provider, | ||
| **kwargs | ||
| **kwargs, | ||
| ) | ||
|
|
||
| self._session_handle = self.thrift_backend.open_session( | ||
|
|
@@ -297,6 +300,117 @@ def _check_not_closed(self): | |
| if not self.open: | ||
| raise Error("Attempting operation on closed cursor") | ||
|
|
||
| def _handle_staging_operation(self, uploads_base_path: str): | ||
| """Fetch the HTTP request instruction from a staging ingestion command | ||
| and call the designated handler. | ||
|
|
||
| Raise an exception if localFile is specified by the server but the localFile | ||
| is not descended from uploads_base_path. | ||
| """ | ||
|
|
||
| if uploads_base_path is None: | ||
| raise Error( | ||
| "You must provide an uploads_base_path when initialising a connection to perform ingestion commands" | ||
| ) | ||
|
|
||
| row = self.active_result_set.fetchone() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know if we are using a field member
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused by the first part of your question:
I don't believe this is correct.
You're pulling on a valid thread. But I disagree with this assessment. In general pysql works fine with multi-threading. In fact, multi-threading is required if you want to cancel a running query (which is reflected in The specific scenario where |
||
|
|
||
| if getattr(row, "localFile", None): | ||
| if os.path.commonpath([row.localFile, uploads_base_path]) != uploads_base_path: | ||
|
||
| raise Error("Local file operations are restricted to paths within the configured uploads_base_path") | ||
susodapop marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # TODO: Experiment with DBR sending real headers. | ||
| # The specification says headers will be in JSON format but the current null value is actually an empty list [] | ||
| handler_args = { | ||
| "presigned_url": row.presignedUrl, | ||
| "local_file": getattr(row, "localFile", None), | ||
| "headers": json.loads(row.headers or "{}"), | ||
| } | ||
|
|
||
| logger.debug( | ||
| f"Attempting staging operation indicated by server: {row.operation} - {getattr(row, 'localFile', '')}" | ||
| ) | ||
|
|
||
| # TODO: Create a retry loop here to re-attempt if the request times out or fails | ||
| if row.operation == "GET": | ||
| return self._handle_staging_get(**handler_args) | ||
| elif row.operation == "PUT": | ||
| return self._handle_staging_put(**handler_args) | ||
| elif row.operation == "REMOVE": | ||
| # Local file isn't needed to remove a remote resource | ||
| handler_args.pop("local_file") | ||
| return self._handle_staging_remove(**handler_args) | ||
| else: | ||
| raise Error( | ||
| f"Operation {row.operation} is not supported. " | ||
| + "Supported operations are GET, PUT, and REMOVE" | ||
| ) | ||
|
|
||
| def _handle_staging_put( | ||
| self, presigned_url: str, local_file: str, headers: dict = None | ||
| ): | ||
| """Make an HTTP PUT request | ||
|
|
||
| Raise an exception if request fails. Returns no data. | ||
| """ | ||
|
|
||
| if local_file is None: | ||
| raise Error("Cannot perform PUT without specifying a local_file") | ||
|
|
||
| with open(local_file, "rb") as fh: | ||
| r = requests.put(url=presigned_url, data=fh, headers=headers) | ||
|
|
||
| # fmt: off | ||
| # Design borrowed from: https://stackoverflow.com/a/2342589/5093960 | ||
|
|
||
| OK = requests.codes.ok # 200 | ||
| CREATED = requests.codes.created # 201 | ||
| ACCEPTED = requests.codes.accepted # 202 | ||
| NO_CONTENT = requests.codes.no_content # 204 | ||
|
|
||
| # fmt: on | ||
|
|
||
| if r.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]: | ||
| raise Error( | ||
| f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}" | ||
| ) | ||
|
|
||
| if r.status_code == ACCEPTED: | ||
| logger.debug( | ||
| f"Response code {ACCEPTED} from server indicates ingestion command was accepted " | ||
| + "but not yet applied on the server. It's possible this command may fail later." | ||
| ) | ||
|
|
||
| def _handle_staging_get( | ||
| self, local_file: str, presigned_url: str, headers: dict = None | ||
| ): | ||
| """Make an HTTP GET request, create a local file with the received data | ||
|
|
||
| Raise an exception if request fails. Returns no data. | ||
| """ | ||
|
|
||
| with open(local_file, "wb") as fp: | ||
susodapop marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
susodapop marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| r = requests.get(url=presigned_url, headers=headers) | ||
|
|
||
| # response.ok verifies the status code is not between 400-600. | ||
| # Any 2xx or 3xx will evaluate r.ok == True | ||
| if not r.ok: | ||
| raise Error( | ||
| f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}" | ||
| ) | ||
|
|
||
| fp.write(r.content) | ||
|
|
||
| def _handle_staging_remove(self, presigned_url: str, headers: dict = None): | ||
| """Make an HTTP DELETE request to the presigned_url""" | ||
|
|
||
| r = requests.delete(url=presigned_url, headers=headers) | ||
|
|
||
| if not r.ok: | ||
| raise Error( | ||
| f"Staging operation over HTTP was unsuccessful: {r.status_code}-{r.text}" | ||
| ) | ||
|
|
||
| def execute( | ||
| self, operation: str, parameters: Optional[Dict[str, str]] = None | ||
| ) -> "Cursor": | ||
|
|
@@ -331,6 +445,12 @@ def execute( | |
| self.buffer_size_bytes, | ||
| self.arraysize, | ||
| ) | ||
|
|
||
| if execute_response.is_staging_operation: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question for reviewers: is there any specifically desired end-state for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite get this question, but the cursor for now will return just one row and we should have reached the end of this cursor.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @susodapop could you please with an a sample code explain how this will provide different experience to the end user? |
||
| self._handle_staging_operation( | ||
| uploads_base_path=self.thrift_backend.uploads_base_path | ||
| ) | ||
|
|
||
| return self | ||
|
|
||
| def executemany(self, operation, seq_of_parameters): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.