diff --git a/.circleci/config.yml b/.circleci/config.yml index 5d41fe4..b283db5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -68,6 +68,16 @@ jobs: command: | pip install flake8 --user flake8 influxdb_client_3/ + - run: + name: Checks style consistency across tests. + command: | + pip install flake8 --user + flake8 tests/ + - run: + name: Checks style consistency across examples. + command: | + pip install flake8 --user + flake8 Examples/ check-twine: docker: - image: *default-python @@ -102,7 +112,7 @@ workflows: not: equal: [ scheduled_pipeline, << pipeline.trigger_source >> ] jobs: -# - check-code-style + - check-code-style # - check-docstyle - check-twine - tests-python: diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..280bdab --- /dev/null +++ b/.flake8 @@ -0,0 +1,6 @@ +[flake8] +count = True +max-line-length = 120 + +# W504: Line break occurred after a binary operator +ignore = W504 diff --git a/CHANGELOG.md b/CHANGELOG.md index 4abd34e..322fd3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,4 +2,8 @@ ### Bugfix -1. [#77](https://github.com/InfluxCommunity/influxdb3-python/pull/77): Support using pandas nullable types \ No newline at end of file +1. [#77](https://github.com/InfluxCommunity/influxdb3-python/pull/77): Support using pandas nullable types + +### Others + +- [#80](https://github.com/InfluxCommunity/influxdb3-python/pull/80): Integrate code style check into CI diff --git a/Examples/batching_example.py b/Examples/batching_example.py index c3c63ac..05490d3 100644 --- a/Examples/batching_example.py +++ b/Examples/batching_example.py @@ -1,13 +1,10 @@ +import datetime import random -import pymongo -import pandas as pd + from bson import ObjectId + import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError -import datetime -import time class BatchingCallback(object): @@ -42,26 +39,24 @@ def retry(self, conf, data: str, exception: InfluxDBError): callback = BatchingCallback() write_options = WriteOptions(batch_size=5_000, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) # Opening InfluxDB client with a batch size of 5k points or flush interval # of 10k ms and gzip compression with InfluxDBClient3.InfluxDBClient3(token=token, host=url, org=org, database=database, enable_gzip=True, write_client_options=wco) as _client: - - # Creating iterator for one hour worth of data (6 sensor readings per # minute) for i in range(0, 525600): @@ -93,25 +88,25 @@ def retry(self, conf, data: str, exception: InfluxDBError): bcWh).field( "bdW", bdW).field( - "bdWh", - bdWh).field( - "cPvWh", - cPvWh).field( - "cW", - cW).field( - "cWh", - cWh).field( - "eWh", - eWh).field( - "iWh", - iWh).field( - "pW", - pW).field( - "pWh", - pWh).field( - "scWh", - scWh).time( - now.strftime('%Y-%m-%dT%H:%M:%SZ'), + "bdWh", + bdWh).field( + "cPvWh", + cPvWh).field( + "cW", + cW).field( + "cWh", + cWh).field( + "eWh", + eWh).field( + "iWh", + iWh).field( + "pW", + pW).field( + "pWh", + pWh).field( + "scWh", + scWh).time( + now.strftime('%Y-%m-%dT%H:%M:%SZ'), WritePrecision.S) # Writing point (InfluxDB automatically batches writes into sets of diff --git a/Examples/cloud_dedicated_query.py b/Examples/cloud_dedicated_query.py index 99b3a35..519b673 100644 --- a/Examples/cloud_dedicated_query.py +++ b/Examples/cloud_dedicated_query.py @@ -1,6 +1,4 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np client = InfluxDBClient3.InfluxDBClient3( token="", @@ -8,7 +6,6 @@ org="6a841c0c08328fb1", database="flight2") - table = client.query( query="SELECT * FROM flight WHERE time > now() - 4h", language="influxql") diff --git a/Examples/community/custom_url.py b/Examples/community/custom_url.py index ff94190..7150f72 100644 --- a/Examples/community/custom_url.py +++ b/Examples/community/custom_url.py @@ -1,7 +1,9 @@ -from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options -import pandas as pd import random +import pandas as pd + +from influxdb_client_3 import InfluxDBClient3, InfluxDBError, WriteOptions, write_client_options + class BatchingCallback(object): @@ -14,36 +16,38 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") -callback = BatchingCallback() +callback = BatchingCallback() write_options = WriteOptions(batch_size=100, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) client = InfluxDBClient3( token="", host="https://eu-central-1-1.aws.cloud2.influxdata.com:442", org="6a841c0c08328fb1", - database="pokemon-codex", enable_gzip=True, write_client_options=wco, write_port_overwrite=443, query_port_overwrite=443) + database="pokemon-codex", enable_gzip=True, write_client_options=wco, write_port_overwrite=443, + query_port_overwrite=443) -now = pd.Timestamp.now(tz='UTC').floor('ms') +now = pd.Timestamp.now(tz='UTC').floor('ms') # Lists of possible trainers trainers = ["ash", "brock", "misty", "gary", "jessie", "james"] # Read the CSV into a DataFrame -pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") +pokemon_df = pd.read_csv( + "https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") # noqa: E501 # Creating an empty list to store the data data = [] @@ -57,17 +61,17 @@ def retry(self, conf, data: str, exception: InfluxDBError): # Generating random data for i in range(num_entries): trainer = random.choice(trainers) - + # Randomly select a row from pokemon_df random_pokemon = pokemon_df.sample().iloc[0] caught = random_pokemon['Name'] - + # Count the number of times this trainer has caught this Pokémon if (trainer, caught) in trainer_pokemon_counts: trainer_pokemon_counts[(trainer, caught)] += 1 else: trainer_pokemon_counts[(trainer, caught)] = 1 - + # Get the number for this combination of trainer and Pokémon num = trainer_pokemon_counts[(trainer, caught)] @@ -93,9 +97,8 @@ def retry(self, conf, data: str, exception: InfluxDBError): # Print the DataFrame print(caught_pokemon_df) - try: client.write(caught_pokemon_df, data_frame_measurement_name='caught', - data_frame_tag_columns=['trainer', 'id', 'num']) + data_frame_tag_columns=['trainer', 'id', 'num']) except Exception as e: - print(f"Error writing point: {e}") \ No newline at end of file + print(f"Error writing point: {e}") diff --git a/Examples/community/database_transfer.py b/Examples/community/database_transfer.py index 3fcdc5b..c93fc84 100644 --- a/Examples/community/database_transfer.py +++ b/Examples/community/database_transfer.py @@ -1,14 +1,8 @@ -import random -import pymongo -import pandas as pd -from bson import ObjectId -import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np -from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError -import datetime import time +import influxdb_client_3 as InfluxDBClient3 +from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError + class BatchingCallback(object): @@ -22,8 +16,6 @@ def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") - - # InfluxDB connection details token = "" org = "6a841c0c08328fb1" @@ -31,35 +23,34 @@ def retry(self, conf, data: str, exception: InfluxDBError): dbto = "b" url = "eu-central-1-1.aws.cloud2.influxdata.com" measurement = "airSensors" -taglist= [] +taglist = [] callback = BatchingCallback() write_options = WriteOptions(batch_size=5_000, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) # Opening InfluxDB client with a batch size of 5k points or flush interval # of 10k ms and gzip compression with InfluxDBClient3.InfluxDBClient3(token=token, host=url, org=org, - enable_gzip=True, write_client_options=wco) as _client: + enable_gzip=True, write_client_options=wco) as _client: query = f"SHOW TAG KEYS FROM {measurement}" tags = _client.query(query=query, language="influxql", database=dbfrom) tags = tags.to_pydict() taglist = tags['tagKey'] - query = f"SELECT * FROM {measurement}" reader = _client.query(query=query, language="influxql", database=dbfrom, mode="chunk") try: @@ -69,10 +60,8 @@ def retry(self, conf, data: str, exception: InfluxDBError): pd = batch.to_pandas() pd = pd.set_index('time') print(pd) - _client.write(database=dbto, record=pd, data_frame_measurement_name=measurement, data_frame_tag_columns=taglist) + _client.write(database=dbto, record=pd, data_frame_measurement_name=measurement, + data_frame_tag_columns=taglist) time.sleep(2) except StopIteration: print("No more chunks to read") - - - diff --git a/Examples/file-import/csv_write.py b/Examples/file-import/csv_write.py index 0d91087..91bc389 100644 --- a/Examples/file-import/csv_write.py +++ b/Examples/file-import/csv_write.py @@ -1,7 +1,5 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np -from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError +from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError class BatchingCallback(object): @@ -15,29 +13,28 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + callback = BatchingCallback() write_options = WriteOptions(batch_size=500, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) - -with InfluxDBClient3.InfluxDBClient3( - token="INSERT_TOKEN", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="python", write_client_options=wco) as client: - - + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) + +with InfluxDBClient3.InfluxDBClient3( + token="INSERT_TOKEN", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", + database="python", write_client_options=wco) as client: client.write_file( file='./out.csv', timestamp_column='time', tag_columns=["provider", "machineID"]) diff --git a/Examples/file-import/feather_write.py b/Examples/file-import/feather_write.py index 28c7539..aec7b47 100644 --- a/Examples/file-import/feather_write.py +++ b/Examples/file-import/feather_write.py @@ -1,7 +1,5 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np -from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError +from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError class BatchingCallback(object): @@ -15,29 +13,28 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + callback = BatchingCallback() write_options = WriteOptions(batch_size=500, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) - -with InfluxDBClient3.InfluxDBClient3( - token="INSERT_TOKEN", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="python", write_client_options=wco) as client: - - + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) + +with InfluxDBClient3.InfluxDBClient3( + token="INSERT_TOKEN", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", + database="python", write_client_options=wco) as client: client.write_file( file='./out.feather', timestamp_column='time', tag_columns=["provider", "machineID"]) diff --git a/Examples/file-import/json_write.py b/Examples/file-import/json_write.py index c93681b..dc1f3f4 100644 --- a/Examples/file-import/json_write.py +++ b/Examples/file-import/json_write.py @@ -1,7 +1,5 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np -from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError +from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError class BatchingCallback(object): @@ -15,32 +13,28 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + callback = BatchingCallback() write_options = WriteOptions(batch_size=500, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) - -with InfluxDBClient3.InfluxDBClient3( - token="INSERT_TOKEN", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="python", write_client_options=wco) as client: - - - - + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) + +with InfluxDBClient3.InfluxDBClient3( + token="INSERT_TOKEN", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", + database="python", write_client_options=wco) as client: client.write_file( file='./out.json', - timestamp_column='time', tag_columns=["provider", "machineID"], date_unit='ns' ) - + timestamp_column='time', tag_columns=["provider", "machineID"], date_unit='ns') diff --git a/Examples/file-import/orc_write.py b/Examples/file-import/orc_write.py index a4ae04d..07467dd 100644 --- a/Examples/file-import/orc_write.py +++ b/Examples/file-import/orc_write.py @@ -1,6 +1,4 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError @@ -15,32 +13,28 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + callback = BatchingCallback() write_options = WriteOptions(batch_size=500, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) - -with InfluxDBClient3.InfluxDBClient3( - token="INSERT_TOKEN", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="python") as client: - - - - + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) + +with InfluxDBClient3.InfluxDBClient3( + token="INSERT_TOKEN", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", + database="python") as client: client.write_file( file='./out.orc', timestamp_column='time', tag_columns=["provider", "machineID"]) - diff --git a/Examples/file-import/parquet_write.py b/Examples/file-import/parquet_write.py index aaa45c3..544850b 100644 --- a/Examples/file-import/parquet_write.py +++ b/Examples/file-import/parquet_write.py @@ -1,7 +1,5 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np -from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError +from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError class BatchingCallback(object): @@ -15,32 +13,29 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + callback = BatchingCallback() write_options = WriteOptions(batch_size=500, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) - -with InfluxDBClient3.InfluxDBClient3( - token="INSERT_TOKEN", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="python", write_client_options=wco) as client: - - - - + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) + +with InfluxDBClient3.InfluxDBClient3( + token="INSERT_TOKEN", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", + database="python", + write_client_options=wco) as client: client.write_file( file='./out.parquet', timestamp_column='time', tag_columns=["provider", "machineID"]) - diff --git a/Examples/file-import/write_file_parse_options.py b/Examples/file-import/write_file_parse_options.py index 832ac7c..f35269d 100644 --- a/Examples/file-import/write_file_parse_options.py +++ b/Examples/file-import/write_file_parse_options.py @@ -1,6 +1,4 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError, file_parser_options @@ -15,32 +13,31 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + callback = BatchingCallback() write_options = WriteOptions(batch_size=500, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) - -with InfluxDBClient3.InfluxDBClient3( - token="", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="python", write_client_options=wco) as client: - - + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) + +with InfluxDBClient3.InfluxDBClient3( + token="", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", + database="python", write_client_options=wco) as client: fpo = file_parser_options(columns=["time", "machineID", "vibration"]) client.write_file( file='./out.parquet', - timestamp_column='time', tag_columns=["provider", "machineID"], measurement_name='machine_data', file_parser_options=fpo) - + timestamp_column='time', tag_columns=["provider", "machineID"], measurement_name='machine_data', + file_parser_options=fpo) diff --git a/Examples/flight_options_example.py b/Examples/flight_options_example.py index 2b1edcb..aa1bba9 100644 --- a/Examples/flight_options_example.py +++ b/Examples/flight_options_example.py @@ -1,6 +1,4 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np from influxdb_client_3 import flight_client_options diff --git a/Examples/pokemon-trainer/basic-query.py b/Examples/pokemon-trainer/basic-query.py index ffb2fb9..0c38be7 100644 --- a/Examples/pokemon-trainer/basic-query.py +++ b/Examples/pokemon-trainer/basic-query.py @@ -1,7 +1,4 @@ from influxdb_client_3 import InfluxDBClient3 -import pandas as pd - - client = InfluxDBClient3( token="", @@ -9,11 +6,10 @@ org="6a841c0c08328fb1", database="pokemon-codex") - sql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time >= now() - interval '1 hour' LIMIT 5''' table = client.query(query=sql, language='sql', mode='all') print(table) influxql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time > now() - 1h LIMIT 5''' table = client.query(query=influxql, language='influxql', mode='pandas') -print(table) \ No newline at end of file +print(table) diff --git a/Examples/pokemon-trainer/basic-write-errorhandling.py b/Examples/pokemon-trainer/basic-write-errorhandling.py index f536098..ded6dfd 100644 --- a/Examples/pokemon-trainer/basic-write-errorhandling.py +++ b/Examples/pokemon-trainer/basic-write-errorhandling.py @@ -1,31 +1,23 @@ -from influxdb_client_3 import InfluxDBClient3, Point, SYNCHRONOUS, write_client_options, InfluxDBError -import pandas as pd -import numpy as np import datetime - +from influxdb_client_3 import InfluxDBClient3, Point, SYNCHRONOUS, write_client_options wco = write_client_options(write_options=SYNCHRONOUS) - with InfluxDBClient3( - token="", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="pokemon-codex", write_client_options=wco) as client: - + token="", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", + database="pokemon-codex", write_client_options=wco) as client: now = datetime.datetime.now(datetime.timezone.utc) - data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1")\ - .field("caught", "charizard")\ - .field("level", 10).field("attack", 30)\ - .field("defense", 40).field("hp", 200)\ - .field("speed", 10)\ - .field("type1", "fire").field("type2", "flying")\ - .time(now) - - - + data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1") \ + .field("caught", "charizard") \ + .field("level", 10).field("attack", 30) \ + .field("defense", 40).field("hp", 200) \ + .field("speed", 10) \ + .field("type1", "fire").field("type2", "flying") \ + .time(now) data = [] # Adding first point @@ -45,9 +37,7 @@ .time(now) ) - - - # Bad point + # Bad point data.append( Point("caught") .tag("trainer", "ash") @@ -67,7 +57,7 @@ try: client.write(data) except Exception as e: - print(f"Error writing point") + print(f"Error writing point: {e}") # Good Query try: @@ -75,15 +65,10 @@ print(table) except Exception as e: print(f"Error querying data: {e}") - + # Bad Query - not a sql query try: table = client.query(query='''SELECT * FROM "caught" WHERE time > now() - 5m''', language='sql') print(table) except Exception as e: print(f"Error querying data: {e}") - - - - - diff --git a/Examples/pokemon-trainer/basic-write-writeoptions.py b/Examples/pokemon-trainer/basic-write-writeoptions.py index bb9ea35..fcca6a4 100644 --- a/Examples/pokemon-trainer/basic-write-writeoptions.py +++ b/Examples/pokemon-trainer/basic-write-writeoptions.py @@ -1,30 +1,25 @@ -from influxdb_client_3 import InfluxDBClient3, Point, SYNCHRONOUS, write_client_options -import pandas as pd -import numpy as np import datetime - +from influxdb_client_3 import InfluxDBClient3, Point, SYNCHRONOUS, write_client_options wco = write_client_options(write_options=SYNCHRONOUS) - with InfluxDBClient3( - token="", - host="eu-central-1-1.aws.cloud2.influxdata.com", - org="6a841c0c08328fb1", - database="pokemon-codex", write_client_options=wco, debug=True) as client: - + token="", + host="eu-central-1-1.aws.cloud2.influxdata.com", + org="6a841c0c08328fb1", + database="pokemon-codex", + write_client_options=wco, + debug=True) as client: now = datetime.datetime.now(datetime.timezone.utc) - data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1")\ - .field("caught", "charizard")\ - .field("level", 10).field("attack", 30)\ - .field("defense", 40).field("hp", 200)\ - .field("speed", 10)\ - .field("type1", "fire").field("type2", "flying")\ - .time(now) - - + data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1") \ + .field("caught", "charizard") \ + .field("level", 10).field("attack", 30) \ + .field("defense", 40).field("hp", 200) \ + .field("speed", 10) \ + .field("type1", "fire").field("type2", "flying") \ + .time(now) try: client.write(data) @@ -83,10 +78,7 @@ .time(now) ) - try: client.write(data) except Exception as e: print(f"Error writing point: {e}") - - diff --git a/Examples/pokemon-trainer/basic-write.py b/Examples/pokemon-trainer/basic-write.py index 19dd0bd..2067dfc 100644 --- a/Examples/pokemon-trainer/basic-write.py +++ b/Examples/pokemon-trainer/basic-write.py @@ -1,6 +1,6 @@ -from influxdb_client_3 import InfluxDBClient3, Point import datetime +from influxdb_client_3 import InfluxDBClient3, Point client = InfluxDBClient3( token="mGbL-OJ2kxYqvbIL9jQOOg2VJLhf16hh-xn-XJe3RUKrI5cewOAy80L5cVIzG0vh7dLLckZkpYfvExgoMBXLFA==", @@ -10,15 +10,13 @@ now = datetime.datetime.now(datetime.timezone.utc) -data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1")\ - .field("caught", "charizard")\ - .field("level", 10).field("attack", 30)\ - .field("defense", 40).field("hp", 200)\ - .field("speed", 10)\ - .field("type1", "fire").field("type2", "flying")\ - .time(now) - - +data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1") \ + .field("caught", "charizard") \ + .field("level", 10).field("attack", 30) \ + .field("defense", 40).field("hp", 200) \ + .field("speed", 10) \ + .field("type1", "fire").field("type2", "flying") \ + .time(now) try: client.write(data) @@ -77,10 +75,7 @@ .time(now) ) - try: client.write(data) except Exception as e: print(f"Error writing point: {e}") - - diff --git a/Examples/pokemon-trainer/pandas-write.py b/Examples/pokemon-trainer/pandas-write.py index 5b649c8..a4fe194 100644 --- a/Examples/pokemon-trainer/pandas-write.py +++ b/Examples/pokemon-trainer/pandas-write.py @@ -1,9 +1,8 @@ -from influxdb_client_3 import InfluxDBClient3 -import pandas as pd -import numpy as np -import datetime import random +import pandas as pd + +from influxdb_client_3 import InfluxDBClient3 client = InfluxDBClient3( token="", @@ -11,13 +10,14 @@ org="6a841c0c08328fb1", database="pokemon-codex") -now = pd.Timestamp.now(tz='UTC').floor('ms') +now = pd.Timestamp.now(tz='UTC').floor('ms') # Lists of possible trainers trainers = ["ash", "brock", "misty", "gary", "jessie", "james"] # Read the CSV into a DataFrame -pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") +pokemon_df = pd.read_csv( + "https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") # noqa: E501 # Creating an empty list to store the data data = [] @@ -31,17 +31,17 @@ # Generating random data for i in range(num_entries): trainer = random.choice(trainers) - + # Randomly select a row from pokemon_df random_pokemon = pokemon_df.sample().iloc[0] caught = random_pokemon['Name'] - + # Count the number of times this trainer has caught this Pokémon if (trainer, caught) in trainer_pokemon_counts: trainer_pokemon_counts[(trainer, caught)] += 1 else: trainer_pokemon_counts[(trainer, caught)] = 1 - + # Get the number for this combination of trainer and Pokémon num = trainer_pokemon_counts[(trainer, caught)] @@ -67,10 +67,8 @@ # Print the DataFrame print(caught_pokemon_df) - - try: client.write(caught_pokemon_df, data_frame_measurement_name='caught', - data_frame_tag_columns=['trainer', 'id', 'num']) + data_frame_tag_columns=['trainer', 'id', 'num']) except Exception as e: - print(f"Error writing point: {e}") \ No newline at end of file + print(f"Error writing point: {e}") diff --git a/Examples/pokemon-trainer/write-batching-flight-calloptions.py b/Examples/pokemon-trainer/write-batching-flight-calloptions.py index de11010..25fca40 100644 --- a/Examples/pokemon-trainer/write-batching-flight-calloptions.py +++ b/Examples/pokemon-trainer/write-batching-flight-calloptions.py @@ -1,8 +1,8 @@ -from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options -import pandas as pd -import numpy as np import random +import pandas as pd + +from influxdb_client_3 import InfluxDBClient3, InfluxDBError, WriteOptions, write_client_options now = pd.Timestamp.now(tz='UTC').floor('ms') two_days_ago = now @@ -19,35 +19,36 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") -callback = BatchingCallback() +callback = BatchingCallback() write_options = WriteOptions(batch_size=100, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) client = InfluxDBClient3( token="", host="eu-central-1-1.aws.cloud2.influxdata.com", org="6a841c0c08328fb1", enable_gzip=True, write_client_options=wco) -now = pd.Timestamp.now(tz='UTC').floor('ms') +now = pd.Timestamp.now(tz='UTC').floor('ms') # Lists of possible trainers trainers = ["ash", "brock", "misty", "gary", "jessie", "james"] # Read the CSV into a DataFrame -pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") +pokemon_df = pd.read_csv( + "https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") # noqa: E501 # Creating an empty list to store the data data = [] @@ -61,17 +62,17 @@ def retry(self, conf, data: str, exception: InfluxDBError): # Generating random data for i in range(num_entries): trainer = random.choice(trainers) - + # Randomly select a row from pokemon_df random_pokemon = pokemon_df.sample().iloc[0] caught = random_pokemon['Name'] - + # Count the number of times this trainer has caught this Pokémon if (trainer, caught) in trainer_pokemon_counts: trainer_pokemon_counts[(trainer, caught)] += 1 else: trainer_pokemon_counts[(trainer, caught)] = 1 - + # Get the number for this combination of trainer and Pokémon num = trainer_pokemon_counts[(trainer, caught)] @@ -95,15 +96,13 @@ def retry(self, conf, data: str, exception: InfluxDBError): caught_pokemon_df = pd.DataFrame(data).set_index('timestamp') # Print the DataFrame -#print(caught_pokemon_df) - - - +# print(caught_pokemon_df) # Query try: - table = client.query(query='''SELECT * FROM caught WHERE time >= now() - 30m''', database='pokemon-codex', timeout=90.0, language='sql', mode='pandas') + table = client.query(query='''SELECT * FROM caught WHERE time >= now() - 30m''', database='pokemon-codex', + timeout=90.0, language='sql', mode='pandas') print(table) except Exception as e: print(f"Error querying points: {e}") diff --git a/Examples/pokemon-trainer/write-batching.py b/Examples/pokemon-trainer/write-batching.py index ea05a39..e24310a 100644 --- a/Examples/pokemon-trainer/write-batching.py +++ b/Examples/pokemon-trainer/write-batching.py @@ -1,7 +1,9 @@ -from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options -import pandas as pd import random +import pandas as pd + +from influxdb_client_3 import InfluxDBClient3, InfluxDBError, WriteOptions, write_client_options + class BatchingCallback(object): @@ -14,22 +16,22 @@ def error(self, conf, data: str, exception: InfluxDBError): def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") -callback = BatchingCallback() +callback = BatchingCallback() write_options = WriteOptions(batch_size=100, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2) + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options - ) + error_callback=callback.error, + retry_callback=callback.retry, + WriteOptions=write_options + ) client = InfluxDBClient3( token="", @@ -37,13 +39,13 @@ def retry(self, conf, data: str, exception: InfluxDBError): org="6a841c0c08328fb1", database="pokemon-codex", enable_gzip=True, write_client_options=wco) -now = pd.Timestamp.now(tz='UTC').floor('ms') +now = pd.Timestamp.now(tz='UTC').floor('ms') # Lists of possible trainers trainers = ["ash", "brock", "misty", "gary", "jessie", "james"] # Read the CSV into a DataFrame -pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") +pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") # noqa: E501 # Creating an empty list to store the data data = [] @@ -57,17 +59,17 @@ def retry(self, conf, data: str, exception: InfluxDBError): # Generating random data for i in range(num_entries): trainer = random.choice(trainers) - + # Randomly select a row from pokemon_df random_pokemon = pokemon_df.sample().iloc[0] caught = random_pokemon['Name'] - + # Count the number of times this trainer has caught this Pokémon if (trainer, caught) in trainer_pokemon_counts: trainer_pokemon_counts[(trainer, caught)] += 1 else: trainer_pokemon_counts[(trainer, caught)] = 1 - + # Get the number for this combination of trainer and Pokémon num = trainer_pokemon_counts[(trainer, caught)] @@ -93,9 +95,8 @@ def retry(self, conf, data: str, exception: InfluxDBError): # Print the DataFrame print(caught_pokemon_df) - try: client.write(caught_pokemon_df, data_frame_measurement_name='caught', - data_frame_tag_columns=['trainer', 'id', 'num']) + data_frame_tag_columns=['trainer', 'id', 'num']) except Exception as e: - print(f"Error writing point: {e}") \ No newline at end of file + print(f"Error writing point: {e}") diff --git a/Examples/query_type.py b/Examples/query_type.py index d79ac87..e9b28ae 100644 --- a/Examples/query_type.py +++ b/Examples/query_type.py @@ -1,7 +1,4 @@ import influxdb_client_3 as InfluxDBClient3 -import pandas as pd -import numpy as np - client = InfluxDBClient3.InfluxDBClient3( token="", @@ -56,5 +53,3 @@ print("reader:") for batch in reader: print(batch.to_pandas()) - - diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 0bfec98..6f02d4c 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -1,20 +1,24 @@ -import urllib.parse, json +import json +import urllib.parse + import pyarrow as pa -from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point -from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, PointSettings -from influxdb_client_3.write_client.domain.write_precision import WritePrecision -from influxdb_client_3.write_client.client.exceptions import InfluxDBError from pyarrow.flight import FlightClient, Ticket, FlightCallOptions + from influxdb_client_3.read_file import UploadFile -import urllib.parse +from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point +from influxdb_client_3.write_client.client.exceptions import InfluxDBError +from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \ + PointSettings +from influxdb_client_3.write_client.domain.write_precision import WritePrecision + try: import polars as pl + polars = True except ImportError: polars = False - def write_client_options(**kwargs): """ Function for providing additional arguments for the WriteApi client. @@ -24,9 +28,11 @@ def write_client_options(**kwargs): """ return kwargs + def default_client_options(**kwargs): return kwargs + def flight_client_options(**kwargs): """ Function for providing additional arguments for the FlightClient. @@ -36,6 +42,7 @@ def flight_client_options(**kwargs): """ return kwargs + def file_parser_options(**kwargs): """ Function for providing additional arguments for the file parser. @@ -43,7 +50,7 @@ def file_parser_options(**kwargs): :param kwargs: Additional arguments for the file parser. :return: dict with the arguments. """ - return kwargs + return kwargs def _deep_merge(target, source): @@ -67,6 +74,7 @@ def _deep_merge(target, source): target = source return target + class InfluxDBClient3: def __init__( self, @@ -99,11 +107,12 @@ def __init__( self._org = org if org is not None else "default" self._database = database self._token = token - self._write_client_options = write_client_options if write_client_options is not None else default_client_options(write_options=SYNCHRONOUS) - + self._write_client_options = write_client_options if write_client_options is not None \ + else default_client_options(write_options=SYNCHRONOUS) + # Parse the host input parsed_url = urllib.parse.urlparse(host) - + # Determine the protocol (scheme), hostname, and port scheme = parsed_url.scheme if parsed_url.scheme else "https" hostname = parsed_url.hostname if parsed_url.hostname else host @@ -118,10 +127,10 @@ def __init__( token=self._token, org=self._org, **kwargs) - + self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options) self._flight_client_options = flight_client_options or {} - + if query_port_overwrite is not None: port = query_port_overwrite self._flight_client = FlightClient(f"grpc+tls://{hostname}:{port}", **self._flight_client_options) @@ -134,7 +143,7 @@ def _merge_options(self, defaults, custom={}): return defaults return _deep_merge(defaults, {key: value for key, value in custom.items()}) - def write(self, record=None, database=None ,**kwargs): + def write(self, record=None, database=None, **kwargs): """ Write data to InfluxDB. @@ -151,9 +160,9 @@ def write(self, record=None, database=None ,**kwargs): self._write_api.write(bucket=database, record=record, **kwargs) except InfluxDBError as e: raise e - - def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, file_parser_options=None ,**kwargs): + def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, + file_parser_options=None, **kwargs): """ Write data from a file to InfluxDB. @@ -177,10 +186,10 @@ def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_co try: table = UploadFile(file, file_parser_options).load_file() df = table.to_pandas() if isinstance(table, pa.Table) else table - self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database, **kwargs) + self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database, + **kwargs) except Exception as e: raise e - def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column, database, **kwargs): # This function is factored out for clarity. @@ -204,9 +213,8 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column data_frame_measurement_name=measurement_name, data_frame_tag_columns=tag_columns, data_frame_timestamp_column=timestamp_column, **kwargs) - - - def query(self, query, language="sql", mode="all", database=None,**kwargs ): + + def query(self, query, language="sql", mode="all", database=None, **kwargs): """ Query data from InfluxDB. @@ -223,12 +231,10 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): """ if mode == "polars" and polars is False: raise ImportError("Polars is not installed. Please install it with `pip install polars`.") - - if database is None: database = self._database - + try: # Create an authorization header optargs = { @@ -237,7 +243,7 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): } opts = self._merge_options(optargs, kwargs) _options = FlightCallOptions(**opts) - + ticket_data = {"database": database, "sql_query": query, "query_type": language} ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._flight_client.do_get(ticket, _options) @@ -249,7 +255,7 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ): "chunk": lambda: flight_reader, "reader": flight_reader.to_reader, "schema": lambda: flight_reader.schema - + }.get(mode, flight_reader.read_all) return mode_func() if callable(mode_func) else mode_func @@ -267,7 +273,8 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - + + __all__ = [ "InfluxDBClient3", "Point", diff --git a/influxdb_client_3/read_file.py b/influxdb_client_3/read_file.py index 0cc24ab..6f9f5e5 100644 --- a/influxdb_client_3/read_file.py +++ b/influxdb_client_3/read_file.py @@ -12,6 +12,7 @@ class UploadFile: """ Class for uploading and reading different types of files. """ + def __init__(self, file, file_parser_options=None): """ Initialize an UploadFile instance. @@ -44,7 +45,7 @@ def load_file(self): else: raise ValueError("Unsupported file type") - def load_feather(self, file ): + def load_feather(self, file): """ Load a Feather file. @@ -99,6 +100,7 @@ def load_json(self, file): try: import pandas as pd except ImportError: - raise ImportError("Pandas is required for write_file(). Please install it using 'pip install pandas' or 'pip install influxdb3-python[pandas]'") - - return pd.read_json(file, **self._kwargs) \ No newline at end of file + raise ImportError("Pandas is required for write_file(). Please install it using 'pip install pandas' or " + "'pip install influxdb3-python[pandas]'") + + return pd.read_json(file, **self._kwargs) diff --git a/influxdb_client_3/write_client/_sync/api_client.py b/influxdb_client_3/write_client/_sync/api_client.py index 7b5e226..cc22ff8 100644 --- a/influxdb_client_3/write_client/_sync/api_client.py +++ b/influxdb_client_3/write_client/_sync/api_client.py @@ -267,7 +267,7 @@ def __deserialize(self, data, klass): if data is None: return None - if type(klass) == str: + if klass is str: if klass.startswith('list['): sub_kls = re.match(r'list\[(.*)\]', klass).group(1) return [self.__deserialize(sub_data, sub_kls) @@ -348,13 +348,13 @@ def call_api(self, resource_path, method, _preload_content, _request_timeout, urlopen_kw) else: thread = self.pool.apply_async(self.__call_api, (resource_path, - method, path_params, query_params, - header_params, body, - post_params, files, - response_type, auth_settings, - _return_http_data_only, - collection_formats, - _preload_content, _request_timeout, urlopen_kw)) + method, path_params, query_params, + header_params, body, + post_params, files, + response_type, auth_settings, + _return_http_data_only, + collection_formats, + _preload_content, _request_timeout, urlopen_kw)) return thread def request(self, method, url, query_params=None, headers=None, diff --git a/influxdb_client_3/write_client/client/_base.py b/influxdb_client_3/write_client/client/_base.py index a661505..82c1b74 100644 --- a/influxdb_client_3/write_client/client/_base.py +++ b/influxdb_client_3/write_client/client/_base.py @@ -5,16 +5,13 @@ import configparser import logging import os -from datetime import datetime, timedelta -from typing import List, Generator, Any, Union, Iterable, AsyncGenerator - -from urllib3 import HTTPResponse +from typing import Iterable +from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer, \ + PolarsDataframeSerializer from influxdb_client_3.write_client.configuration import Configuration -from influxdb_client_3.write_client.service.write_service import WriteService - -from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer, PolarsDataframeSerializer from influxdb_client_3.write_client.rest import _UTF_8_encoding +from influxdb_client_3.write_client.service.write_service import WriteService try: import dataclasses @@ -208,7 +205,6 @@ def _from_env_properties(cls, debug=None, enable_gzip=False, **kwargs): profilers=profilers, **kwargs) - class _BaseWriteApi(object): def __init__(self, influxdb_client, point_settings=None): self._influxdb_client = influxdb_client @@ -260,7 +256,6 @@ def _serialize(self, record, write_precision, payload, **kwargs): serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs) self._serialize(serializer.serialize(), write_precision, payload, **kwargs) - elif hasattr(record, "_asdict"): # noinspection PyProtectedMember self._serialize(record._asdict(), write_precision, payload, **kwargs) @@ -271,7 +266,6 @@ def _serialize(self, record, write_precision, payload, **kwargs): self._serialize(item, write_precision, payload, **kwargs) - class _Configuration(Configuration): def __init__(self): Configuration.__init__(self) diff --git a/influxdb_client_3/write_client/client/influxdb_client.py b/influxdb_client_3/write_client/client/influxdb_client.py index 1172375..ece7795 100644 --- a/influxdb_client_3/write_client/client/influxdb_client.py +++ b/influxdb_client_3/write_client/client/influxdb_client.py @@ -3,8 +3,6 @@ from __future__ import absolute_import import logging -import warnings - from influxdb_client_3.write_client.client._base import _BaseClient from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions, PointSettings @@ -284,7 +282,6 @@ def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): """ return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings, **kwargs) - def close(self): """Shutdown the client.""" self.__del__() @@ -294,8 +291,3 @@ def __del__(self): if self.api_client: self.api_client.__del__() self.api_client = None - - - - - diff --git a/influxdb_client_3/write_client/client/write/dataframe_serializer.py b/influxdb_client_3/write_client/client/write/dataframe_serializer.py index 7ff0050..febe8bb 100644 --- a/influxdb_client_3/write_client/client/write/dataframe_serializer.py +++ b/influxdb_client_3/write_client/client/write/dataframe_serializer.py @@ -9,7 +9,8 @@ import re from influxdb_client_3.write_client.domain import WritePrecision -from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, DEFAULT_WRITE_PRECISION +from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, \ + DEFAULT_WRITE_PRECISION logger = logging.getLogger('influxdb_client.client.write.dataframe_serializer') @@ -195,9 +196,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION sep = '' if len(field_indexes) == 0 else ',' - if (issubclass(value.type, np.integer) - or issubclass(value.type, np.floating) - or issubclass(value.type, np.bool_)): + if (issubclass(value.type, np.integer) or issubclass(value.type, np.floating) or + issubclass(value.type, np.bool_)): suffix = 'i' if issubclass(value.type, np.integer) else '' if null_columns.iloc[index]: field_value = ( @@ -301,8 +301,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION :key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. :key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column """ - - + self.data_frame = data_frame self.point_settings = point_settings self.precision = precision @@ -315,7 +314,9 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION self.column_indices = {name: index for index, name in enumerate(data_frame.columns)} if self.timestamp_column is None or self.timestamp_column not in self.column_indices: - raise ValueError(f"Timestamp column {self.timestamp_column} not found in DataFrame. Please define a valid timestamp column.") + raise ValueError( + f"Timestamp column {self.timestamp_column} not found in DataFrame. Please define a valid timestamp " + f"column.") # # prepare chunks @@ -325,20 +326,19 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION self.chunk_size = chunk_size else: self.number_of_chunks = None - - def escape_key(self,value): + + def escape_key(self, value): return str(value).translate(_ESCAPE_KEY) - - def escape_value(self,value): + + def escape_value(self, value): return str(value).translate(_ESCAPE_STRING) - def to_line_protocol(self, row): # Filter out None or empty values for tags tags = "" - + tags = ",".join( - f'{self.escape_key(col)}={self.escape_key(row[self.column_indices[col]])}' + f'{self.escape_key(col)}={self.escape_key(row[self.column_indices[col]])}' for col in self.tag_columns if row[self.column_indices[col]] is not None and row[self.column_indices[col]] != "" ) @@ -352,23 +352,21 @@ def to_line_protocol(self, row): if tags and default_tags: tags += "," tags += default_tags - - - # add escape symbols for special characters to tags fields = ",".join( - f"{col}=\"{self.escape_value(row[self.column_indices[col]])}\"" if isinstance(row[self.column_indices[col]], str) - else f"{col}={str(row[self.column_indices[col]]).lower()}" if isinstance(row[self.column_indices[col]], bool) # Check for bool first + f"{col}=\"{self.escape_value(row[self.column_indices[col]])}\"" if isinstance(row[self.column_indices[col]], + str) + else f"{col}={str(row[self.column_indices[col]]).lower()}" if isinstance(row[self.column_indices[col]], + bool) # Check for bool first else f"{col}={row[self.column_indices[col]]}i" if isinstance(row[self.column_indices[col]], int) else f"{col}={row[self.column_indices[col]]}" for col in self.column_indices - if col not in self.tag_columns + [self.timestamp_column] - and row[self.column_indices[col]] is not None and row[self.column_indices[col]] != "" + if col not in self.tag_columns + [self.timestamp_column] and + row[self.column_indices[col]] is not None and row[self.column_indices[col]] != "" ) - # Access the Unix timestamp timestamp = row[self.column_indices[self.timestamp_column]] if tags != "": @@ -378,10 +376,9 @@ def to_line_protocol(self, row): return line_protocol - def serialize(self, chunk_idx: int = None): from ...extras import pl - + df = self.data_frame # Check if the timestamp column is already an integer @@ -391,16 +388,19 @@ def serialize(self, chunk_idx: int = None): else: # Convert timestamp to Unix timestamp based on specified precision if self.precision in [None, 'ns']: - df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column)) + df = df.with_columns( + pl.col(self.timestamp_column).dt.epoch(time_unit="ns").alias(self.timestamp_column)) elif self.precision == 'us': - df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="us").alias(self.timestamp_column)) + df = df.with_columns( + pl.col(self.timestamp_column).dt.epoch(time_unit="us").alias(self.timestamp_column)) elif self.precision == 'ms': - df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="ms").alias(self.timestamp_column)) + df = df.with_columns( + pl.col(self.timestamp_column).dt.epoch(time_unit="ms").alias(self.timestamp_column)) elif self.precision == 's': df = df.with_columns(pl.col(self.timestamp_column).dt.epoch(time_unit="s").alias(self.timestamp_column)) else: raise ValueError(f"Unsupported precision: {self.precision}") - + if chunk_idx is None: chunk = df else: @@ -408,18 +408,13 @@ def serialize(self, chunk_idx: int = None): chunk = df[chunk_idx * self.chunk_size:(chunk_idx + 1) * self.chunk_size] # Apply the UDF to each row - line_protocol_expr = chunk.apply(self.to_line_protocol,return_dtype=pl.Object) + line_protocol_expr = chunk.apply(self.to_line_protocol, return_dtype=pl.Object) lp = line_protocol_expr['map'].to_list() - return lp - - - - def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs): """ Serialize DataFrame into LineProtocols. @@ -436,6 +431,7 @@ def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_W """ # noqa: E501 return DataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize() + def polars_data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs): """ Serialize DataFrame into LineProtocols. @@ -450,4 +446,4 @@ def polars_data_frame_to_list_of_points(data_frame, point_settings, precision=DE or other formats and types supported by `pandas.to_datetime `_ - ``DataFrame`` :key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column - ``DataFrame`` """ # noqa: E501 - return PolarsDataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize() \ No newline at end of file + return PolarsDataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize() diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index a9ef791..2123e90 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -19,14 +19,14 @@ from influxdb_client_3.write_client.domain import WritePrecision from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS from influxdb_client_3.write_client.client.util.helpers import get_org_query_param -from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer, PolarsDataframeSerializer +from influxdb_client_3.write_client.client.write.dataframe_serializer import (DataframeSerializer, + PolarsDataframeSerializer) from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION from influxdb_client_3.write_client.client.write.retry import WritesRetry from influxdb_client_3.write_client.rest import _UTF_8_encoding logger = logging.getLogger('influxdb_client_3.write_client.client.write_api') - if _HAS_DATACLASS: import dataclasses from dataclasses import dataclass @@ -460,10 +460,11 @@ def _write_batching(self, bucket, org, data, elif isinstance(data, dict): self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs), precision, **kwargs) - + elif 'polars' in str(type(data)): - serializer = PolarsDataframeSerializer(data, self._point_settings, precision, self._write_options.batch_size, - **kwargs) + serializer = PolarsDataframeSerializer(data, + self._point_settings, precision, + self._write_options.batch_size, **kwargs) for chunk_idx in range(serializer.number_of_chunks): self._write_batching(bucket, org, serializer.serialize(chunk_idx), @@ -477,7 +478,6 @@ def _write_batching(self, bucket, org, data, serializer.serialize(chunk_idx), precision, **kwargs) - elif hasattr(data, "_asdict"): # noinspection PyProtectedMember self._write_batching(bucket, org, data._asdict(), precision, **kwargs) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index a9df7d1..5adfc1e 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,9 @@ import unittest -from unittest.mock import patch, Mock +from unittest.mock import patch + from influxdb_client_3 import InfluxDBClient3 + class TestInfluxDBClient3(unittest.TestCase): @patch('influxdb_client_3._InfluxDBClient') @@ -25,5 +27,6 @@ def test_init(self): self.assertEqual(self.client._write_api, self.mock_write_api.return_value) self.assertEqual(self.client._flight_client, self.mock_flight_client.return_value) + if __name__ == '__main__': unittest.main()