diff --git a/.circleci/config.yml b/.circleci/config.yml index b283db5..2ebd157 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,6 +6,8 @@ commands: parameters: python-image: type: string + pytest-marker: + type: string steps: - restore_cache: name: Restoring Pip Cache @@ -19,8 +21,8 @@ commands: mkdir test-reports || true pip install . --user pip install .\[dataframe\] --user - pip install pytest pytest-cov --user - pytest tests --junitxml=test-reports/junit.xml --cov=./ --cov-report xml:coverage.xml + pip install .\[test\] --user + pytest -m "<< parameters.pytest-marker >>" tests --junitxml=test-reports/junit.xml --cov=./ --cov-report xml:coverage.xml - save_cache: name: Saving Pip Cache key: *cache-key @@ -34,7 +36,10 @@ jobs: parameters: python-image: type: string - default: &default-python "cimg/python:3.7" + default: &default-python "cimg/python:3.8" + pytest-marker: + type: string + default: "not integration" docker: - image: << parameters.python-image >> environment: @@ -43,6 +48,7 @@ jobs: - checkout - client-test: python-image: << parameters.python-image >> + pytest-marker: << parameters.pytest-marker >> - store_test_results: path: test-reports - run: @@ -63,21 +69,26 @@ jobs: PIPENV_VENV_IN_PROJECT: true steps: - checkout + - run: + name: Checks style consistency of setup.py. + command: | + pip install flake8 --user + flake8 setup.py - run: name: Checks style consistency across sources. command: | pip install flake8 --user - flake8 influxdb_client_3/ + flake8 influxdb_client_3/ - run: name: Checks style consistency across tests. command: | pip install flake8 --user - flake8 tests/ + flake8 tests/ - run: name: Checks style consistency across examples. command: | pip install flake8 --user - flake8 Examples/ + flake8 Examples/ check-twine: docker: - image: *default-python @@ -130,6 +141,16 @@ workflows: - tests-python: name: test-3.12 python-image: "cimg/python:3.12" + - tests-python: + requires: + - test-3.8 + - test-3.9 + - test-3.10 + - test-3.11 + - test-3.12 + name: test-integration + python-image: *default-python + pytest-marker: "integration" nightly: when: diff --git a/.markdownlint.yml b/.markdownlint.yml new file mode 100644 index 0000000..eeee22d --- /dev/null +++ b/.markdownlint.yml @@ -0,0 +1,5 @@ +{ + "MD024": { + "siblings_only": true + }, +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 89f4298..db1e0b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,15 +1,30 @@ - # Change Log ## 0.5.0 [unreleased] +### Features + +1. [#88](https://github.com/InfluxCommunity/influxdb3-python/pull/88): Add support for named query parameters: + ```python + from influxdb_client_3 import InfluxDBClient3 + + with InfluxDBClient3(host="https://us-east-1-1.aws.cloud2.influxdata.com", + token="my-token", + database="my-database") as client: + + table = client.query("select * from cpu where host=$host", query_parameters={"host": "server01"}) + + print(table.to_pandas()) + + ``` + ### Bugfix -- [#87](https://github.com/InfluxCommunity/influxdb3-python/pull/87): Fix examples to use `write_options` instead of the object name `WriteOptions` +1. [#87](https://github.com/InfluxCommunity/influxdb3-python/pull/87): Fix examples to use `write_options` instead of the object name `WriteOptions` ### Others -- [#84](https://github.com/InfluxCommunity/influxdb3-python/pull/84): Enable packaging type information - `py.typed` +1. [#84](https://github.com/InfluxCommunity/influxdb3-python/pull/84): Enable packaging type information - `py.typed` ## 0.4.0 [2024-04-17] @@ -19,4 +34,4 @@ ### Others -- [#80](https://github.com/InfluxCommunity/influxdb3-python/pull/80): Integrate code style check into CI +1. [#80](https://github.com/InfluxCommunity/influxdb3-python/pull/80): Integrate code style check into CI diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 6f02d4c..7bb877b 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -69,12 +69,26 @@ def _deep_merge(target, source): elif isinstance(target, list) and isinstance(source, list): # If both target and source are lists, concatenate them target.extend(source) - else: + elif source is not None: # For other types, simply replace the target with the source target = source return target +def _merge_options(defaults, exclude_keys=None, custom=None): + """ + Merge default option arguments with custom (user-provided) arguments, + excluding specific keys defined in exclude_keys. + """ + if custom is None or len(custom) == 0: + return defaults + + if exclude_keys is None: + exclude_keys = [] + + return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys}) + + class InfluxDBClient3: def __init__( self, @@ -135,14 +149,6 @@ def __init__( port = query_port_overwrite self._flight_client = FlightClient(f"grpc+tls://{hostname}:{port}", **self._flight_client_options) - def _merge_options(self, defaults, custom={}): - """ - Merge default option arguments with custom (user-provided) arguments. - """ - if len(custom) == 0: - return defaults - return _deep_merge(defaults, {key: value for key, value in custom.items()}) - def write(self, record=None, database=None, **kwargs): """ Write data to InfluxDB. @@ -214,20 +220,23 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column data_frame_tag_columns=tag_columns, data_frame_timestamp_column=timestamp_column, **kwargs) - def query(self, query, language="sql", mode="all", database=None, **kwargs): - """ - Query data from InfluxDB. - - :param query: The query string. - :type query: str - :param language: The query language; "sql" or "influxql" (default is "sql"). - :type language: str - :param mode: The mode of fetching data (all, pandas, chunk, reader, schema). - :type mode: str + def query(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs): + """Query data from InfluxDB. + + If you want to use query parameters, you can pass them as kwargs: + + >>> client.query("select * from cpu where host=$host", query_parameters={"host": "server01"}) + + :param query: The query to execute on the database. + :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql". + :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk", + "reader" or "schema". Defaults to "all". :param database: The database to query from. If not provided, uses the database provided during initialization. - :type database: str - :param kwargs: FlightClientCallOptions for the query. - :return: The queried data. + :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to + set up per request headers. + :keyword query_parameters: The query parameters to use in the query. + It should be a ``dictionary`` of key-value pairs. + :return: The query result in the specified mode. """ if mode == "polars" and polars is False: raise ImportError("Polars is not installed. Please install it with `pip install polars`.") @@ -241,10 +250,22 @@ def query(self, query, language="sql", mode="all", database=None, **kwargs): "headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))], "timeout": 300 } - opts = self._merge_options(optargs, kwargs) + opts = _merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs) _options = FlightCallOptions(**opts) - ticket_data = {"database": database, "sql_query": query, "query_type": language} + # + # Ticket data + # + ticket_data = { + "database": database, + "sql_query": query, + "query_type": language + } + # add query parameters + query_parameters = kwargs.get("query_parameters", None) + if query_parameters: + ticket_data["params"] = query_parameters + ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._flight_client.do_get(ticket, _options) diff --git a/setup.py b/setup.py index 39cd813..ad9a0df 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,6 @@ import os import re - requires = [ 'reactivex >= 4.0.4', 'certifi >= 14.05.14', @@ -15,6 +14,7 @@ with open("./README.md", "r", encoding="utf-8") as fh: long_description = fh.read() + def get_version_from_github_ref(): github_ref = os.environ.get("GITHUB_REF") if not github_ref: @@ -26,6 +26,7 @@ def get_version_from_github_ref(): return match.group(1) + def get_version(): # If running in GitHub Actions, get version from GITHUB_REF version = get_version_from_github_ref() @@ -35,6 +36,7 @@ def get_version(): # Fallback to a default version if not in GitHub Actions return "v0.0.0" + setup( name='influxdb3-python', version=get_version(), @@ -45,7 +47,12 @@ def get_version(): author_email='contact@influxdata.com', url='https://github.com/InfluxCommunity/influxdb3-python', packages=find_packages(exclude=['tests', 'tests.*', 'examples', 'examples.*']), - extras_require={'pandas': ['pandas'], 'polars': ['polars'], 'dataframe': ['pandas', 'polars']}, + extras_require={ + 'pandas': ['pandas'], + 'polars': ['polars'], + 'dataframe': ['pandas', 'polars'], + 'test': ['pytest', 'pytest-cov'] + }, install_requires=requires, python_requires='>=3.8', classifiers=[ diff --git a/tests/test_deep_merge.py b/tests/test_deep_merge.py new file mode 100644 index 0000000..8f6e04c --- /dev/null +++ b/tests/test_deep_merge.py @@ -0,0 +1,54 @@ +import unittest + +import influxdb_client_3 + + +class TestDeepMerge(unittest.TestCase): + + def test_deep_merge_dicts_with_no_overlap(self): + target = {"a": 1, "b": 2} + source = {"c": 3, "d": 4} + result = influxdb_client_3._deep_merge(target, source) + self.assertEqual(result, {"a": 1, "b": 2, "c": 3, "d": 4}) + + def test_deep_merge_dicts_with_overlap(self): + target = {"a": 1, "b": 2} + source = {"b": 3, "c": 4} + result = influxdb_client_3._deep_merge(target, source) + self.assertEqual(result, {"a": 1, "b": 3, "c": 4}) + + def test_deep_merge_nested_dicts(self): + target = {"a": {"b": 1}} + source = {"a": {"c": 2}} + result = influxdb_client_3._deep_merge(target, source) + self.assertEqual(result, {"a": {"b": 1, "c": 2}}) + + def test_deep_merge_lists(self): + target = [1, 2] + source = [3, 4] + result = influxdb_client_3._deep_merge(target, source) + self.assertEqual(result, [1, 2, 3, 4]) + + def test_deep_merge_non_overlapping_types(self): + target = {"a": 1} + source = [2, 3] + result = influxdb_client_3._deep_merge(target, source) + self.assertEqual(result, [2, 3]) + + def test_deep_merge_none_to_flight(self): + target = { + "headers": [(b"authorization", "Bearer xyz".encode('utf-8'))], + "timeout": 300 + } + source = None + result = influxdb_client_3._deep_merge(target, source) + self.assertEqual(result, target) + + def test_deep_merge_empty_to_flight(self): + target = { + "headers": [(b"authorization", "Bearer xyz".encode('utf-8'))], + "timeout": 300 + } + source = {} + result = influxdb_client_3._deep_merge(target, source) + self.assertEqual(result, target) diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py new file mode 100644 index 0000000..0275764 --- /dev/null +++ b/tests/test_influxdb_client_3_integration.py @@ -0,0 +1,44 @@ +import os +import time +import unittest + +import pytest + +from influxdb_client_3 import InfluxDBClient3 + + +@pytest.mark.integration +@pytest.mark.skipif( + not all( + [ + os.getenv('TESTING_INFLUXDB_URL'), + os.getenv('TESTING_INFLUXDB_TOKEN'), + os.getenv('TESTING_INFLUXDB_DATABASE'), + ] + ), + reason="Integration test environment variables not set.", +) +class TestInfluxDBClient3Integration(unittest.TestCase): + + def setUp(self): + host = os.getenv('TESTING_INFLUXDB_URL') + token = os.getenv('TESTING_INFLUXDB_TOKEN') + database = os.getenv('TESTING_INFLUXDB_DATABASE') + + self.client = InfluxDBClient3(host=host, database=database, token=token) + + def tearDown(self): + if self.client: + self.client.close() + + def test_write_and_query(self): + test_id = time.time_ns() + self.client.write(f"integration_test_python,type=used value=123.0,test_id={test_id}i") + + sql = 'SELECT * FROM integration_test_python where type=$type and test_id=$test_id' + + df = self.client.query(sql, mode="pandas", query_parameters={'type': 'used', 'test_id': test_id}) + + self.assertEqual(1, len(df)) + self.assertEqual(test_id, df['test_id'][0]) + self.assertEqual(123.0, df['value'][0]) diff --git a/tests/test_merge_options.py b/tests/test_merge_options.py new file mode 100644 index 0000000..a263a41 --- /dev/null +++ b/tests/test_merge_options.py @@ -0,0 +1,28 @@ +import unittest + +import influxdb_client_3 + + +class MergeOptionsTests(unittest.TestCase): + + def test_merge_with_empty_custom(self): + defaults = {"a": 1, "b": 2} + result = influxdb_client_3._merge_options(defaults, custom={}) + self.assertEqual(result, defaults) + + def test_merge_with_none_custom(self): + defaults = {"a": 1, "b": 2} + result = influxdb_client_3._merge_options(defaults, custom=None) + self.assertEqual(result, defaults) + + def test_merge_with_no_excluded_keys(self): + defaults = {"a": 1, "b": 2} + custom = {"b": 3, "c": 4} + result = influxdb_client_3._merge_options(defaults, custom=custom) + self.assertEqual(result, {"a": 1, "b": 3, "c": 4}) + + def test_merge_with_excluded_keys(self): + defaults = {"a": 1, "b": 2} + custom = {"b": 3, "c": 4} + result = influxdb_client_3._merge_options(defaults, exclude_keys=["b"], custom=custom) + self.assertEqual(result, {"a": 1, "b": 2, "c": 4}) diff --git a/tests/test_query.py b/tests/test_query.py new file mode 100644 index 0000000..e2131f6 --- /dev/null +++ b/tests/test_query.py @@ -0,0 +1,53 @@ +import unittest +from unittest.mock import Mock, patch, ANY + +from pyarrow.flight import Ticket + +from influxdb_client_3 import InfluxDBClient3 + + +class QueryTests(unittest.TestCase): + + @patch('influxdb_client_3._InfluxDBClient') + @patch('influxdb_client_3._WriteApi') + @patch('influxdb_client_3.FlightClient') + def setUp(self, mock_flight_client, mock_write_api, mock_influx_db_client): + self.mock_influx_db_client = mock_influx_db_client + self.mock_write_api = mock_write_api + self.mock_flight_client = mock_flight_client + self.client = InfluxDBClient3( + host="localhost", + org="my_org", + database="my_db", + token="my_token" + ) + self.client._flight_client = mock_flight_client + self.client._write_api = mock_write_api + + def test_query_without_parameters(self): + mock_do_get = Mock() + self.client._flight_client.do_get = mock_do_get + + self.client.query('SELECT * FROM measurement') + + expected_ticket = Ticket( + '{"database": "my_db", ' + '"sql_query": "SELECT * FROM measurement", ' + '"query_type": "sql"}'.encode('utf-8') + ) + + mock_do_get.assert_called_once_with(expected_ticket, ANY) + + def test_query_with_parameters(self): + mock_do_get = Mock() + self.client._flight_client.do_get = mock_do_get + + self.client.query('SELECT * FROM measurement WHERE time > $time', query_parameters={"time": "2021-01-01"}) + + expected_ticket = Ticket( + '{"database": "my_db", ' + '"sql_query": "SELECT * FROM measurement WHERE time > $time", ' + '"query_type": "sql", "params": {"time": "2021-01-01"}}'.encode('utf-8') + ) + + mock_do_get.assert_called_once_with(expected_ticket, ANY)