Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions Examples/pokemon-trainer/basic-write-errorhandling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from influxdb_client_3 import InfluxDBClient3, Point, SYNCHRONOUS, write_client_options, InfluxDBError
import pandas as pd
import numpy as np
import datetime



wco = write_client_options(write_options=SYNCHRONOUS)


with InfluxDBClient3(
token="",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1",
database="pokemon-codex", write_client_options=wco) as client:

now = datetime.datetime.now(datetime.timezone.utc)

data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1")\
.field("caught", "charizard")\
.field("level", 10).field("attack", 30)\
.field("defense", 40).field("hp", 200)\
.field("speed", 10)\
.field("type1", "fire").field("type2", "flying")\
.time(now)




data = []
# Adding first point
data.append(
Point("caught")
.tag("trainer", "ash")
.tag("id", "0006")
.tag("num", "1")
.field("caught", "charizard")
.field("level", 10)
.field("attack", 30)
.field("defense", 40)
.field("hp", 200)
.field("speed", 10)
.field("type1", "fire")
.field("type2", "flying")
.time(now)
)



# Bad point
data.append(
Point("caught")
.tag("trainer", "ash")
.tag("id", "0008")
.tag("num", "3")
.field("caught", "squirtle")
.field("level", 13)
.field("attack", 29)
.field("defense", 40)
.field("hp", 180)
.field("speed", 13)
.field("type1", "water")
.field("type2", None)
.time(now)
)

try:
client.write(data)
except Exception as e:
print(f"Error writing point")

# Good Query
try:
table = client.query(query='''SELECT * FROM "caught" WHERE time > now() - 5m''', language='influxql')
print(table)
except Exception as e:
print(f"Error querying data: {e}")

# Bad Query - not a sql query
try:
table = client.query(query='''SELECT * FROM "caught" WHERE time > now() - 5m''', language='sql')
print(table)
except Exception as e:
print(f"Error querying data: {e}")





37 changes: 21 additions & 16 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
org=self._org,
**kwargs)

self._write_api = _WriteApi(self._client, **self._write_client_options)
self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
self._flight_client_options = flight_client_options or {}
self._flight_client = FlightClient(f"grpc+tls://{host}:443", **self._flight_client_options)

Expand All @@ -75,7 +75,8 @@ def write(self, record=None, **kwargs):
try:
self._write_api.write(bucket=self._database, record=record, **kwargs)
except InfluxDBError as e:
print(f"InfluxDB Error: {e}")
raise e


def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', **kwargs):
"""
Expand All @@ -96,7 +97,8 @@ def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_co
df = table.to_pandas() if isinstance(table, pa.Table) else table
self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column)
except Exception as e:
print(f"Error writing file: {e}")
raise e


def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column):
# This function is factored out for clarity.
Expand Down Expand Up @@ -133,19 +135,22 @@ def query(self, query, language="sql", mode="all"):
:type mode: str
:return: The queried data.
"""
ticket_data = {"database": self._database, "sql_query": query, "query_type": language}
ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
flight_reader = self._flight_client.do_get(ticket, self._options)

mode_func = {
"all": flight_reader.read_all,
"pandas": flight_reader.read_pandas,
"chunk": lambda: flight_reader,
"reader": flight_reader.to_reader,
"schema": lambda: flight_reader.schema
}.get(mode, flight_reader.read_all)

return mode_func() if callable(mode_func) else mode_func
try:
ticket_data = {"database": self._database, "sql_query": query, "query_type": language}
ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
flight_reader = self._flight_client.do_get(ticket, self._options)

mode_func = {
"all": flight_reader.read_all,
"pandas": flight_reader.read_pandas,
"chunk": lambda: flight_reader,
"reader": flight_reader.to_reader,
"schema": lambda: flight_reader.schema
}.get(mode, flight_reader.read_all)

return mode_func() if callable(mode_func) else mode_func
except Exception as e:
raise e

def close(self):
"""Close the client and clean up resources."""
Expand Down