Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
28c3a59
Basic PUT operation. Currently this never executes because the server
Nov 15, 2022
1b245b1
Bump Spark CLI service protocol version being used.
Nov 15, 2022
1239def
Log when attempting a staging operation
Nov 15, 2022
b605cce
Fix failing unit tests since function signature for ExecuteResponse c…
Nov 16, 2022
3ed84d8
Add e2e test for put.
Nov 16, 2022
57b8a34
Bail on tests if staging_ingestion_user is not set
Nov 16, 2022
7812278
Black client.py
Nov 16, 2022
6b76439
Add unit test that sanity checks _handle_staging_operation is called
Nov 17, 2022
3df7c89
Fix imports so that this module can be run independently:
Nov 17, 2022
8f0a02e
Implement GET operation
Nov 17, 2022
55525cb
Refactor client.py into distinct methods for each ingestion command type
Nov 23, 2022
157ac3d
Update pypoetry so I can develop on Python 3.10
Nov 23, 2022
0739ccc
Applied PR feedback around explicit response codes.
Nov 23, 2022
d3a3651
Applying PR feedback
Nov 23, 2022
72f917e
PR feedback
Nov 23, 2022
fba64b7
Black client.py
Nov 23, 2022
c27a3d6
Refactor e2e test to use a single teste for PUT, GET, and REMOVE
Nov 23, 2022
19ca706
Make REMOVE command work
Nov 23, 2022
0167bd9
These methods don't need to know the `operation`
Nov 23, 2022
85e4d7c
Remove single quote that broke query
Nov 23, 2022
713002d
Remove unneeded argument
Nov 23, 2022
fc06ef8
Expect operation to succeed
Nov 23, 2022
cafa17d
Black PySQLStagingIngestionTestSuite only
Nov 23, 2022
a508a1c
Tidy up comments in e2e test
Nov 23, 2022
ce80df0
Basic e2e test scaffolded in. Currently fails.
Nov 23, 2022
36885a4
Only allow ingestion commands when base_uploads_path is specified
Nov 23, 2022
c0c09d4
Restrict local file operations to descendents of uploads_base_path
Nov 23, 2022
f612795
Remove per PR feedback
Dec 20, 2022
e609ef3
Add check for null local_file per PR feedback
Dec 20, 2022
cdbe2d6
Open output stream _after_ successful HTTP request
Dec 20, 2022
34a0362
Resolve relative paths before comparing row.localFile to uploads_base…
Dec 20, 2022
c8a64c7
Add test that PUT fails if file exists in staging location and OVERWR…
Dec 20, 2022
d48d3f3
Add tests: operations fail to modify another user's staging location
Dec 20, 2022
e0037e0
Add test that ingestion command fails if local file is blank
Dec 20, 2022
3fa5d84
Add test that invalid staging path will fail at server
Dec 20, 2022
4824b68
Basic usage example (needs tweaking)
Dec 22, 2022
469f35f
Add samples of GET and REMOVE
Dec 22, 2022
bdb948a
Refactor to allow uploads_base_path to be either a single string object
Dec 28, 2022
0261b7a
Refactor uploads_base_path to staging_allowed_local_path
Dec 29, 2022
00d8a49
Fix mypy static type failures
Dec 30, 2022
7a602e6
Black src files
Dec 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export access_token=""
There are several e2e test suites available:
- `PySQLCoreTestSuite`
- `PySQLLargeQueriesSuite`
- `PySQLStagingIngestionTestSuite`
- `PySQLRetryTestSuite.HTTP503Suite` **[not documented]**
- `PySQLRetryTestSuite.HTTP429Suite` **[not documented]**
- `PySQLUnityCatalogTestSuite` **[not documented]**
Expand All @@ -122,6 +123,12 @@ To execute the core test suite:
poetry run python -m pytest tests/e2e/driver_tests.py::PySQLCoreTestSuite
```

The `PySQLCoreTestSuite` namespace contains tests for all of the connector's basic features and behaviours. This is the default namespace where tests should be written unless they require specially configured clusters or take an especially long-time to execute by design.

The `PySQLLargeQueriesSuite` namespace contains long-running query tests and is kept separate. In general, if the `PySQLCoreTestSuite` passes then these tests will as well.

The `PySQLStagingIngestionTestSuite` namespace requires a cluster running DBR version > 13.x which supports staging ingestion commands.

The suites marked `[not documented]` require additional configuration which will be documented at a later time.
### Code formatting

Expand Down
57 changes: 57 additions & 0 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pandas
import pyarrow
import requests

from databricks.sql import __version__
from databricks.sql import *
Expand Down Expand Up @@ -297,6 +298,58 @@ def _check_not_closed(self):
if not self.open:
raise Error("Attempting operation on closed cursor")

def _handle_staging_operation(self):
"""Make HTTP request using instructions provided by server"""

row = self.active_result_set.fetchone()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know self.active_result_set is introduced in this PR.
so this is merely a generic question rather that specific to staging.

if we are using a field member self.active_result_set for keeping a state that means we won't be able to support multi threading in an application which concurrently uses pysql. is this understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the first part of your question:

I know self.active_result_set is introduced in this PR.

I don't believe this is correct. active_result_set has been present since the first version of this library. It's present on main right now.

if we are using a field member self.active_result_set for keeping a state that means we won't be able to support multi threading in an application which concurrently uses pysql

You're pulling on a valid thread. But I disagree with this assessment. In general pysql works fine with multi-threading. In fact, multi-threading is required if you want to cancel a running query (which is reflected in PySQLCoreTestSuite.test_cancel_during_execute).

The specific scenario where active_result_set state would affect multi-threaded applications is if multiple threads are working with the same cursor. Is that a desirable usage pattern? I think there is usually one cursor per thread, in which case there's no issue with shared state.


# TODO: Handle headers. What format will gateway send? json? plaintext?
operation, presigned_url, local_file, headers = (
row.operation,
row.presignedUrl,
row.localFile,
None,
)

operation_map = {
"PUT": requests.put,
"GET": requests.get,
}

if operation not in operation_map:
raise Error(
"Operation {} is not supported. Supported operations are {}".format(
operation, ",".join(operation_map.keys())
)
)

req_func = operation_map[operation]

if local_file:
raw_data = open(local_file, "rb")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_file is super interesting here, customer might try different schemes I bet.
dbfs://bla, https://bla, files://bla

Each of those schemes has a related "open" function and client side shall try to understand their ask and support/decline their request.

I also would like to know how much of those schemes we plan to support in near term and how this function would grow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preliminary spec is to only support upload of local files. Nothing in dbfs or from an arbitrary URL. That restriction isn't implemented here because it's part of a separate ticket. The basic idea is that uploads will only be possible when a user configures an uploadsBasePath pointing to a mounted volume.

I agree that we need a way to hook this behaviour for other file origins, however. I'm going to noodle how we can make this sufficiently generic for now.

cc: @moderakh

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingestion only supports uploading from local file system not elsewhere. anything else must fail.

else:
raw_data = None

rq_func_args = dict(url=presigned_url, data=raw_data)

logger.debug(
"Attempting staging operation: {} - {}".format(operation, local_file)
)

# Call the function
resp = req_func(**rq_func_args)

if resp.status_code != 200:
raise Error(
"Staging operation over HTTP was unsuccessful: {}-{}".format(
resp.status_code, resp.text
)
)

if operation == "GET":
with open(local_file, "wb") as fp:
fp.write(resp.content)

def execute(
self, operation: str, parameters: Optional[Dict[str, str]] = None
) -> "Cursor":
Expand Down Expand Up @@ -331,6 +384,10 @@ def execute(
self.buffer_size_bytes,
self.arraysize,
)

if execute_response.is_staging_operation:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for reviewers: is there any specifically desired end-state for the cursor after a staging operation? Maybe we return a new NamedTuple StagingOperationResult with properties of .successful:boolean and perhaps a copy of the operation and localFile that were used?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get this question, but the cursor for now will return just one row and we should have reached the end of this cursor.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@susodapop could you please with an a sample code explain how this will provide different experience to the end user?

self._handle_staging_operation()

return self

def executemany(self, operation, seq_of_parameters):
Expand Down
5 changes: 4 additions & 1 deletion src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def open_session(self, session_configuration, catalog, schema):
initial_namespace = None

open_session_req = ttypes.TOpenSessionReq(
client_protocol_i64=ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6,
client_protocol_i64=ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
client_protocol=None,
initialNamespace=initial_namespace,
canUseMultipleCatalogs=True,
Expand Down Expand Up @@ -733,6 +733,8 @@ def _results_message_to_execute_response(self, resp, operation_state):
.to_pybytes()
)
lz4_compressed = t_result_set_metadata_resp.lz4Compressed
# TODO: will this fail if metadata doesn't include `isStagingOperation`?
is_staging_operation = t_result_set_metadata_resp.isStagingOperation
if direct_results and direct_results.resultSet:
assert direct_results.resultSet.results.startRowOffset == 0
assert direct_results.resultSetMetadata
Expand All @@ -752,6 +754,7 @@ def _results_message_to_execute_response(self, resp, operation_state):
has_been_closed_server_side=has_been_closed_server_side,
has_more_rows=has_more_rows,
lz4_compressed=lz4_compressed,
is_staging_operation=is_staging_operation,
command_handle=resp.operationHandle,
description=description,
arrow_schema_bytes=schema_bytes,
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def remaining_rows(self) -> pyarrow.Table:

ExecuteResponse = namedtuple(
"ExecuteResponse",
"status has_been_closed_server_side has_more_rows description lz4_compressed "
"status has_been_closed_server_side has_more_rows description lz4_compressed is_staging_operation "
"command_handle arrow_queue arrow_schema_bytes",
)

Expand Down
55 changes: 55 additions & 0 deletions tests/e2e/driver_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import sys
import tempfile
import threading
import time
from unittest import loader, skipIf, skipUnless, TestCase
Expand All @@ -14,6 +15,7 @@
import pyarrow
import pytz
import thrift
import pytest

import databricks.sql as sql
from databricks.sql import STRING, BINARY, NUMBER, DATETIME, DATE, DatabaseError, Error, OperationalError
Expand Down Expand Up @@ -630,6 +632,59 @@ def test_initial_namespace(self):
cursor.execute("select current_database()")
self.assertEqual(cursor.fetchone()[0], table_name)

class PySQLStagingIngestionTestSuite(PySQLTestCase):
"""Simple namespace for ingestion tests. These should be run against DBR >13.x

In addition to connection credentials (host, path, token) this suite requires an env var
named staging_ingestion_user"""

staging_ingestion_user = os.getenv("staging_ingestion_user")

if staging_ingestion_user is None:
raise ValueError("To run these tests you must designate a `staging_ingestion_user` environment variable. This will the user associated with the personal access token.")

def test_staging_ingestion_put_and_get(self):

fh, temp_path = tempfile.mkstemp()

original_text = "hello world!".encode("utf-8")

with open(fh, 'wb') as fp:
fp.write(original_text)

with self.connection() as conn:
cursor = conn.cursor()
query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE"
cursor.execute(query)

# TODO: What is the acceptance test for a successful staging operation?
# For now, let's GET the file back and compare it to the original

new_fh, new_temp_path = tempfile.mkstemp()

with self.connection() as conn:
cursor = conn.cursor()
query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'"
cursor.execute(query)

with open(new_fh, 'rb') as fp:
fetched_text = fp.read()

assert fetched_text == original_text

os.remove(temp_path)
os.remove(new_temp_path)

def test_staging_ingestion_delete(self):

# Test stub to be completed when we implement DELETE. We need to guarantee this file exists before we attempt to remove it.

with self.connection() as conn:
cursor = conn.cursor()
query = f"REMOVE 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv''"
with pytest.raises(Error):
cursor.execute(query)


def main(cli_args):
global get_args_from_env
Expand Down
6 changes: 4 additions & 2 deletions tests/unit/test_fetches.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def make_dummy_result_set_from_initial_results(initial_results):
lz4_compressed=Mock(),
command_handle=None,
arrow_queue=arrow_queue,
arrow_schema_bytes=schema.serialize().to_pybytes()))
arrow_schema_bytes=schema.serialize().to_pybytes(),
is_staging_operation=False))
num_cols = len(initial_results[0]) if initial_results else 0
rs.description = [(f'col{col_id}', 'integer', None, None, None, None, None)
for col_id in range(num_cols)]
Expand Down Expand Up @@ -75,7 +76,8 @@ def fetch_results(op_handle, max_rows, max_bytes, expected_row_start_offset, lz4
lz4_compressed=Mock(),
command_handle=None,
arrow_queue=None,
arrow_schema_bytes=None))
arrow_schema_bytes=None,
is_staging_operation=False))
return rs

def assertEqualRowValues(self, actual, expected):
Expand Down
21 changes: 18 additions & 3 deletions tests/unit/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from databricks.sql import InterfaceError, DatabaseError, Error, NotSupportedError
from databricks.sql.types import Row

from test_fetches import FetchTests
from test_thrift_backend import ThriftBackendTestSuite
from test_arrow_queue import ArrowQueueSuite
from tests.unit.test_fetches import FetchTests
from tests.unit.test_thrift_backend import ThriftBackendTestSuite
from tests.unit.test_arrow_queue import ArrowQueueSuite


class ClientTestSuite(unittest.TestCase):
Expand Down Expand Up @@ -534,6 +534,21 @@ def test_cursor_keeps_connection_alive(self, mock_client_class):
self.assertEqual(instance.close_session.call_count, 0)
cursor.close()

@patch("%s.client.ThriftBackend" % PACKAGE_NAME)
@patch("%s.client.Cursor._handle_staging_operation" % PACKAGE_NAME)
@patch("%s.utils.ExecuteResponse" % PACKAGE_NAME)
def test_staging_operation_response_is_handled(self, mock_client_class, mock_handle_staging_operation, mock_execute_response):
# If server sets ExecuteResponse.is_staging_operation True then _handle_staging_operation should be called

mock_execute_response.is_staging_operation = True

connection = databricks.sql.connect(**self.DUMMY_CONNECTION_ARGS)
cursor = connection.cursor()
cursor.execute("Text of some staging operation command;")
connection.close()

mock_handle_staging_operation.assert_called_once_with()


if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
Expand Down