From 06472d12f585691fec407318866beeeb51e30b55 Mon Sep 17 00:00:00 2001 From: Brent Picasso Date: Sat, 31 May 2025 15:31:43 -0700 Subject: [PATCH 1/7] WIP API and example for livestreams --- examples/test_livestream.py | 92 ++++++++++++++ podium_api/livestream.py | 244 ++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 3 files changed, 337 insertions(+) create mode 100644 examples/test_livestream.py create mode 100644 podium_api/livestream.py diff --git a/examples/test_livestream.py b/examples/test_livestream.py new file mode 100644 index 0000000..036f433 --- /dev/null +++ b/examples/test_livestream.py @@ -0,0 +1,92 @@ +import argparse +from time import sleep + +from kivy.app import App +from kivy.lang import Builder +from kivy.uix.boxlayout import BoxLayout + +from podium_api import register_podium_application +from podium_api.asyncreq import get_json_header_token +from podium_api.livestream import PodiumLivestream +from podium_api.login import make_login_post + + +def parse_args(): + parser = argparse.ArgumentParser(description="Parse app connection parameters.") + parser.add_argument("--app_id", required=True, help="Application ID") + parser.add_argument("--api_uri", required=True, help="Application API URI") + parser.add_argument("--ws_uri", required=True, help="WebSocket API URI") + parser.add_argument("--username", required=True, help="Username") + parser.add_argument("--password", required=True, help="Password") + return parser.parse_args() + + +args = parse_args() + +KV = """ +: + orientation: 'vertical' + spacing: 10 + padding: 10 + + TextInput: + id: text_box + text: "" + multiline: True + size_hint_y: 0.9 + + Button: + text: "Start" + size_hint_y: 0.1 + on_press: app.on_button_click() +""" + + +class MyWidget(BoxLayout): + pass + + +class MyApp(App): + def build(self): + Builder.load_string(KV) + return MyWidget() + + def on_connection_open(self): + print("connection opened") + self._livestream.list_telemetry_sessions() + + def on_connection_close(self): + print("connection closed") + + def on_list_telemetry_sessions(self, body): + print(f"telemetry sessions: {body}") + + def start_livestream(self, token): + self._livestream = livestream = PodiumLivestream() + livestream.set_connection_open_listener(self.on_connection_open) + livestream.set_connection_close_listener(self.on_connection_close) + livestream.set_list_telemetry_sessions_listener(self.on_list_telemetry_sessions) + + header = get_json_header_token(token) + livestream.open_connection(args.ws_uri, header) + + def login_success(self, token): + print(f"login success: {token}") + self.start_livestream(token) + + def login_failure(self, error_type, results, data): + print(f"login failed! error_type: {error_type}; results: {results}; data: {data}") + + def login(self): + register_podium_application(args.app_id, "", args.api_uri) + + make_login_post( + args.username, args.password, success_callback=self.login_success, failure_callback=self.login_failure + ) + + def on_button_click(self): + self.login() + + +if __name__ == "__main__": + MyApp().run() diff --git a/podium_api/livestream.py b/podium_api/livestream.py new file mode 100644 index 0000000..446fb1c --- /dev/null +++ b/podium_api/livestream.py @@ -0,0 +1,244 @@ +import json +import logging +import ssl +import time +import uuid +from threading import Thread + +import websocket +from websocket import WebSocketConnectionClosedException + +logger = logging.getLogger("PodiumLivestream") +logger.setLevel(logging.DEBUG) + + +class PodiumLivestream: + PING_DELAY = 5.0 + WEBSOCKET_PING_INTERVAL = 10 + WEBSOCKET_PING_TIMEOUT = 5 + WEBSOCKET_RECONNECT_DELAY = 1.0 + + def __init__(self): + self.reply_uuid = str(uuid.uuid1().hex) + self._running = False + self._ws = None + self._telemetry_stream_started_listener = None + self._telemetry_stream_ended_listener = None + self._list_telemetry_sessions_listener = None + self._sensor_data_listener = None + self._connection_open_listener = None + self._connection_close_listener = None + self._connection_warning_listener = None + self._connection_error_listener = None + self._alertmessage_listener = None + self._alertmessage_ack_listener = None + + def set_telemetry_stream_started_listener(self, listener): + self._telemetry_stream_started_listener = listener + + def set_telemetry_stream_ended_listener(self, listener): + self._telemetry_stream_ended = listener + + def set_list_telemetry_sessions_listener(self, listener): + self._list_telemetry_sessions_listener = listener + + def set_sensor_data_listener(self, listener): + self._list_telemetry_sessions_listener = listener + + def set_connection_open_listener(self, listener): + self._connection_open_listener = listener + + def set_connection_close_listener(self, listener): + self._connection_close_listener = listener + + def set_connection_warning_listener(self, listener): + self._connection_warning_listener = listener + + def set_connection_error_listener(self, listener): + self._connection_error_listener = listener + + def set_alertmessage_listener(self, listener): + self._alertmessage_listener = listener + + def set_alertmessage_ack_listener(self, listener): + self._alertmessage_ack_listener = listener + + def on_telemetry_stream_started(self, body): + eventdevice_id = body.get("eventDeviceId") + device_id = body.get("deviceId") + logger.info("PodiumLivestream: Telemetry stream started: {} {}".format(eventdevice_id, device_id)) + if listener := self._telemetry_stream_started_listener: + listener(device_id, eventdevice_id) + + def on_telemetry_stream_ended(self, body): + eventdevice_id = body.get("eventDeviceId") + device_id = body.get("deviceId") + logger.info("PodiumLivestream: telemetry stream ended: {} {}".format(eventdevice_id, device_id)) + if listener := self._telemetry_stream_ended_listener: + listener(device_id, eventdevice_id) + + def on_list_telemetry_sessions(self, body): + logger.debug("PodiumLivestream: ws_list_telemetry_sessions: {}".format(body)) + if listener := self._list_telemetry_sessions_listener: + listener(body) + + def on_sensor_data(self, body): + logger.debug("on sensordata %s", body) + if listener := self._sensor_data_listener: + listener(body) + + def on_ws_event(self, ws, body): + logger.debug("on event: {}".format(body)) + + def _on_ws_error(self, ws, error): + logger.error("PodiumLivestream: Websocket error: {}".format(error)) + if listener := self._connection_error_listener: + listener(error) + + def _on_ws_close(self, ws, close_status_code, close_msg): + logger.info("PodiumLivestream: Websocket closed - status: %s - %s", close_status_code, close_msg) + if listener := self._connection_close_listener: + listener() + + def _on_connection_warning(self): + logger.warning("Reconnecting: Your local internet is unstable") + if listener := self._connection_warning_listener: + listener() + + def _on_connection_open(self): + logger.info("Connection opened") + if listener := self._connection_open_listener: + listener() + + def on_alertmessage(self, body): + logger.info("on alertmessage %s", body) + if listener := self._alertmessage_listener: + listener(body) + + def on_alertmessageack(self, body): + logger.info("on alertmessage ack %s", body) + if listener := self._alertmessage_ack_listener: + listener(body) + + def _websocket_send(self, msg): + if not self._ws: + logger.error("PodiumLivestream: could not send websocket message, websocket does not exist") + return + try: + self._ws.send(msg) + except WebSocketConnectionClosedException: + logger.error("PodiumLivestream: Web socket is closed") + except Exception as e: + logger.error("PodiumLivestream: Failed to send websocket message {}".format(e)) + + def unregister_sensor_data(self, eventdevice_id): + logger.info("PodiumLivestream: unregistering sensor data for eventdevice_id {}".format(eventdevice_id)) + self._websocket_send('{{"type":"unregister","address":"event.{}"}}'.format(eventdevice_id)) + self._websocket_send('{{"type":"unregister","address":"sensorData.{}"}}'.format(eventdevice_id)) + self._websocket_send('{{"type":"unregister","address":"overviewData.{}"}}'.format(eventdevice_id)) + self._websocket_send('{{"type":"unregister","address":"alertmessage.{}"}}'.format(eventdevice_id)) + self._websocket_send('{{"type":"unregister","address":"alertmsgAck.{}"}}'.format(eventdevice_id)) + + def register_sensor_data(self, eventdevice_id): + logger.info("PodiumLivestream: registering sensor data for eventdevice_id {}".format(eventdevice_id)) + self._websocket_send('{{"type":"register","address":"event.{}"}}'.format(eventdevice_id)) + self._websocket_send('{{"type":"register","address":"sensorData.{}"}}'.format(eventdevice_id)) + self._websocket_send('{{"type":"register","address":"overviewData.{}"}}'.format(eventdevice_id)) + self._websocket_send('{{"type":"register","address":"alertmessage.{}"}}'.format(eventdevice_id)) + self._websocket_send('{{"type":"register","address":"alertmsgAck.{}"}}'.format(eventdevice_id)) + + def list_telemetry_sessions(self): + self._websocket_send( + '{{"type":"send","address":"listTelemetryStreamSessions","body":{{}},"replyAddress":"{}"}}'.format( + self.reply_uuid + ) + ) + + def open_connection(self, url, header): + def ws_run_forever(): + + # Enable this for viewing trace information + # websocket.enableTrace(True) + + while True: # TRY TO RECONNECT + try: + self._ws = ws = websocket.WebSocketApp( + url, + on_open=self._on_ws_open, + on_message=self._on_ws_message, + on_error=self._on_ws_error, + on_close=self._on_ws_close, + header=header, + ) + logger.info("PodiumLiveStream: Starting livestream") + ws.run_forever( + sslopt={"cert_reqs": ssl.CERT_NONE}, + ping_interval=self.WEBSOCKET_PING_INTERVAL, + ping_timeout=self.WEBSOCKET_PING_TIMEOUT, + ) + + self._on_connection_warning() + logger.info("PodiumLiveStream: After livestream") + time.sleep(self.WEBSOCKET_RECONNECT_DELAY) + except Exception as ex: + logger.warn("PodiumLiveStream: Exception after livestream connection: {}".format(ex)) + + wst = Thread(target=ws_run_forever) + wst.daemon = True + wst.start() + + def close_connection(self): + self._running = False + self._ws.close() + + def dispatch_msg(self, ws, address, body): + if address.startswith("sensorData"): + self.on_sensor_data(body) + elif address.startswith("event"): + self.on_ws_event(body) + elif address == "telemetryStreamStarted": + self.on_telemetry_stream_started(body) + elif address == "telemetryStreamEnded": + self.on_telemetry_stream_ended(body) + elif address == self.reply_uuid: + self.on_list_telemetry_sessions(body) + elif address.startswith("alertmessage"): + self.on_alertmessage(body) + elif address.startswith("alertmsgAck"): + self.on_alertmessage_ack(body) + + def _on_ws_message(self, ws, message): + logger.debug("PodiumLivestream: %s", message) + msg = json.loads(message) + address = msg.get("address") + body = msg.get("body") + if None not in [address, body]: + self.dispatch_msg(ws, address, body) + else: + logger.error("PodiumLivestream: Malformed message received: {}".format(message)) + + def _on_ws_open(self, ws): + logger.info("PodiumLivestream: Websocket open") + + def run(*args): + ws.send('{"type":"register","address":"telemetryStreamStarted"}') + ws.send('{"type":"register","address":"telemetryStreamEnded"}') + + while self._running: + try: + logger.debug("PodiumLivestream: websocket ping") + time.sleep(PodiumLivestream.PING_DELAY) + ws.send('{ "type": "ping" }') + except WebSocketConnectionClosedException: + logger.error("PodiumLivestream: Websocket closed") + break + except Exception as e: + logger.error("PodiumLivestream: failed to send ping: %s (%s)", e, type(e)) + + logger.info("PodiumLivestream: Ping thread exiting") + + self._running = True + t = Thread(target=run) + t.daemon = True + t.start() + self._on_connection_open() diff --git a/requirements.txt b/requirements.txt index 88392d7..55e4696 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ plyer==1.4.2 keyring==11.1.0 kivy==2.3.0 mock==1.3.0 +websocket-client==1.7.0 From 7b5d0914c836494cc6734c77dceba3a82bbff468 Mon Sep 17 00:00:00 2001 From: Brent Picasso Date: Sat, 31 May 2025 15:38:06 -0700 Subject: [PATCH 2/7] function name refactoring --- podium_api/livestream.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/podium_api/livestream.py b/podium_api/livestream.py index 446fb1c..9fd1bd2 100644 --- a/podium_api/livestream.py +++ b/podium_api/livestream.py @@ -63,26 +63,26 @@ def set_alertmessage_listener(self, listener): def set_alertmessage_ack_listener(self, listener): self._alertmessage_ack_listener = listener - def on_telemetry_stream_started(self, body): + def _on_telemetry_stream_started(self, body): eventdevice_id = body.get("eventDeviceId") device_id = body.get("deviceId") logger.info("PodiumLivestream: Telemetry stream started: {} {}".format(eventdevice_id, device_id)) if listener := self._telemetry_stream_started_listener: listener(device_id, eventdevice_id) - def on_telemetry_stream_ended(self, body): + def _on_telemetry_stream_ended(self, body): eventdevice_id = body.get("eventDeviceId") device_id = body.get("deviceId") logger.info("PodiumLivestream: telemetry stream ended: {} {}".format(eventdevice_id, device_id)) if listener := self._telemetry_stream_ended_listener: listener(device_id, eventdevice_id) - def on_list_telemetry_sessions(self, body): + def _on_list_telemetry_sessions(self, body): logger.debug("PodiumLivestream: ws_list_telemetry_sessions: {}".format(body)) if listener := self._list_telemetry_sessions_listener: listener(body) - def on_sensor_data(self, body): + def _on_sensor_data(self, body): logger.debug("on sensordata %s", body) if listener := self._sensor_data_listener: listener(body) @@ -110,7 +110,7 @@ def _on_connection_open(self): if listener := self._connection_open_listener: listener() - def on_alertmessage(self, body): + def _on_alertmessage(self, body): logger.info("on alertmessage %s", body) if listener := self._alertmessage_listener: listener(body) @@ -193,19 +193,19 @@ def close_connection(self): def dispatch_msg(self, ws, address, body): if address.startswith("sensorData"): - self.on_sensor_data(body) + self._on_sensor_data(body) elif address.startswith("event"): self.on_ws_event(body) elif address == "telemetryStreamStarted": - self.on_telemetry_stream_started(body) + self._on_telemetry_stream_started(body) elif address == "telemetryStreamEnded": - self.on_telemetry_stream_ended(body) + self._on_telemetry_stream_ended(body) elif address == self.reply_uuid: - self.on_list_telemetry_sessions(body) + self._on_list_telemetry_sessions(body) elif address.startswith("alertmessage"): - self.on_alertmessage(body) + self._on_alertmessage(body) elif address.startswith("alertmsgAck"): - self.on_alertmessage_ack(body) + self._on_alertmessage_ack(body) def _on_ws_message(self, ws, message): logger.debug("PodiumLivestream: %s", message) From 09da6bdbf47a0b1fe232b2c8e5eeed3dda27ab10 Mon Sep 17 00:00:00 2001 From: Brent Picasso Date: Mon, 7 Jul 2025 21:52:33 -0700 Subject: [PATCH 3/7] add examples to test viewing livestreams, refactor to not use kivy UrlProvider, allow it to be pluggable as needed --- examples/test_livestream.py | 48 +++++------------ examples/test_livestream_gui.py | 95 +++++++++++++++++++++++++++++++++ podium_api/__init__.py | 3 -- podium_api/api.py | 14 ++--- podium_api/asyncreq.py | 21 ++++++-- podium_api/standard_request.py | 80 +++++++++++++++++++++++++++ 6 files changed, 214 insertions(+), 47 deletions(-) create mode 100644 examples/test_livestream_gui.py create mode 100644 podium_api/standard_request.py diff --git a/examples/test_livestream.py b/examples/test_livestream.py index 036f433..a1bd6ef 100644 --- a/examples/test_livestream.py +++ b/examples/test_livestream.py @@ -1,10 +1,6 @@ import argparse from time import sleep -from kivy.app import App -from kivy.lang import Builder -from kivy.uix.boxlayout import BoxLayout - from podium_api import register_podium_application from podium_api.asyncreq import get_json_header_token from podium_api.livestream import PodiumLivestream @@ -23,44 +19,21 @@ def parse_args(): args = parse_args() -KV = """ -: - orientation: 'vertical' - spacing: 10 - padding: 10 - - TextInput: - id: text_box - text: "" - multiline: True - size_hint_y: 0.9 - - Button: - text: "Start" - size_hint_y: 0.1 - on_press: app.on_button_click() -""" - -class MyWidget(BoxLayout): - pass - - -class MyApp(App): - def build(self): - Builder.load_string(KV) - return MyWidget() +class LivestreamTester: + def __init__(self): + self._livestream = None def on_connection_open(self): print("connection opened") - self._livestream.list_telemetry_sessions() + self.request_current_livestreams() def on_connection_close(self): print("connection closed") def on_list_telemetry_sessions(self, body): print(f"telemetry sessions: {body}") - + def start_livestream(self, token): self._livestream = livestream = PodiumLivestream() livestream.set_connection_open_listener(self.on_connection_open) @@ -84,9 +57,14 @@ def login(self): args.username, args.password, success_callback=self.login_success, failure_callback=self.login_failure ) - def on_button_click(self): - self.login() + def request_current_livestreams(self): + if self._livestream: + self._livestream.list_telemetry_sessions() if __name__ == "__main__": - MyApp().run() + tester = LivestreamTester() + tester.login() + while True: + tester.request_current_livestreams() + sleep(1) diff --git a/examples/test_livestream_gui.py b/examples/test_livestream_gui.py new file mode 100644 index 0000000..622ae39 --- /dev/null +++ b/examples/test_livestream_gui.py @@ -0,0 +1,95 @@ +import argparse +from time import sleep + +from kivy.app import App +from kivy.lang import Builder +from kivy.network.urlrequest import UrlRequest as KivyUrlRequest +from kivy.uix.boxlayout import BoxLayout + +from podium_api import register_podium_application +from podium_api.asyncreq import get_json_header_token, set_urlrequest_class +from podium_api.livestream import PodiumLivestream +from podium_api.login import make_login_post + + +def parse_args(): + parser = argparse.ArgumentParser(description="Parse app connection parameters.") + parser.add_argument("--app_id", required=True, help="Application ID") + parser.add_argument("--api_uri", required=True, help="Application API URI") + parser.add_argument("--ws_uri", required=True, help="WebSocket API URI") + parser.add_argument("--username", required=True, help="Username") + parser.add_argument("--password", required=True, help="Password") + return parser.parse_args() + + +args = parse_args() + +KV = """ +: + orientation: 'vertical' + spacing: 10 + padding: 10 + + TextInput: + id: text_box + text: "" + multiline: True + size_hint_y: 0.9 + + Button: + text: "Start" + size_hint_y: 0.1 + on_press: app.on_button_click() +""" + + +class MyWidget(BoxLayout): + pass + + +class MyApp(App): + def build(self): + Builder.load_string(KV) + return MyWidget() + + def on_connection_open(self): + print("connection opened") + self._livestream.list_telemetry_sessions() + + def on_connection_close(self): + print("connection closed") + + def on_list_telemetry_sessions(self, body): + print(f"telemetry sessions: {body}") + + def start_livestream(self, token): + self._livestream = livestream = PodiumLivestream() + livestream.set_connection_open_listener(self.on_connection_open) + livestream.set_connection_close_listener(self.on_connection_close) + livestream.set_list_telemetry_sessions_listener(self.on_list_telemetry_sessions) + + header = get_json_header_token(token) + livestream.open_connection(args.ws_uri, header) + + def login_success(self, token): + print(f"login success: {token}") + self.start_livestream(token) + + def login_failure(self, error_type, results, data): + print(f"login failed! error_type: {error_type}; results: {results}; data: {data}") + + def login(self): + # needed for Kivy's eventing model when handling UrlRequests + set_urlrequest_class(KivyUrlRequest) + register_podium_application(args.app_id, "", args.api_uri) + + make_login_post( + args.username, args.password, success_callback=self.login_success, failure_callback=self.login_failure + ) + + def on_button_click(self): + self.login() + + +if __name__ == "__main__": + MyApp().run() diff --git a/podium_api/__init__.py b/podium_api/__init__.py index edef1e2..2b8d188 100644 --- a/podium_api/__init__.py +++ b/podium_api/__init__.py @@ -3,9 +3,6 @@ from podium_api.types.application import PodiumApplication """ -The podium_api module allows you to asynchronously interact with the Podium -API. It is built on top of Kivy's UrlRequest. - **Module Attributes:** """ diff --git a/podium_api/api.py b/podium_api/api.py index 45d4455..9f59d81 100644 --- a/podium_api/api.py +++ b/podium_api/api.py @@ -1,7 +1,6 @@ +import logging from typing import Any, Callable -from kivy.logger import Logger - import podium_api from podium_api.account import make_account_get from podium_api.alertmessages import ( @@ -110,6 +109,9 @@ def __init__(self, token): self.podium_account = None self.podium_user = None + self._logger = logger = logging.getLogger("PodiumAPI") + logger.setLevel(logging.INFO) + def init_connection( self, success_callback: Callable[[PodiumAccount, PodiumUser], None], @@ -121,12 +123,12 @@ def _load_account( self, success_callback: Callable[[PodiumAccount], None], failure_callback: Callable[[str, str, str], None] ) -> None: def success(account: PodiumAccount): - Logger.info("PodiumAPI: Loaded Account %s", account.username) + self._logger.info("PodiumAPI: Loaded Account %s", account.username) self.podium_account = account self._load_user(account, success_callback, failure_callback) def failure(error_type: str, results: str, data: str) -> None: - Logger.error("PodiumAPI: Failed to load account: %s: %s", error_type, results) + self._logger.error("PodiumAPI: Failed to load account: %s: %s", error_type, results) failure_callback(error_type, results, data) self.account.get(success_callback=success, failure_callback=failure) @@ -138,12 +140,12 @@ def _load_user( failure_callback: Callable[[str, str, str], None], ) -> None: def success(user): - Logger.info("PodiumAPI: Loaded User %s", user.username) + self._logger.info("PodiumAPI: Loaded User %s", user.username) self.podium_user = user success_callback(account, user) def failure(error_type: str, results: str, data: str): - Logger.error("PodiumAPI: Failed to load user: %s: %s", error_type, results) + self._logger.error("PodiumAPI: Failed to load user: %s: %s", error_type, results) failure_callback(error_type, results, data) self.users.get(account.user_uri, success_callback=success, failure_callback=failure) diff --git a/podium_api/asyncreq.py b/podium_api/asyncreq.py index d26bfe8..b0809b1 100644 --- a/podium_api/asyncreq.py +++ b/podium_api/asyncreq.py @@ -1,7 +1,5 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from kivy.network.urlrequest import UrlRequest - import podium_api try: @@ -9,8 +7,24 @@ except: from urllib import urlencode +from podium_api.standard_request import StandardUrlRequest from podium_api.types.exceptions import PodiumApplicationNotRegistered +_urlrequest_cls = None + + +def set_urlrequest_class(cls): + global _urlrequest_cls + _urlrequest_cls = cls + + +def get_urlrequest_class(): + global _urlrequest_cls + if _urlrequest_cls is not None: + return _urlrequest_cls + else: + return StandardUrlRequest + def get_json_header_token(token): """ @@ -119,7 +133,8 @@ def make_request( endpoint = "{}&{}".format(endpoint, params) else: endpoint = "{}?{}".format(endpoint, params) - return UrlRequest( + ReqClass = get_urlrequest_class() + return ReqClass( endpoint, method=method, req_body=body, diff --git a/podium_api/standard_request.py b/podium_api/standard_request.py new file mode 100644 index 0000000..03a9f2b --- /dev/null +++ b/podium_api/standard_request.py @@ -0,0 +1,80 @@ +import json +import threading + +import requests + + +class StandardUrlRequest: + def __init__( + self, + url, + method="GET", + req_body=None, + req_headers=None, + on_success=None, + on_failure=None, + on_error=None, + on_redirect=None, + on_progress=None, + **kwargs + ): + self._url = url + self._method = method + self._body = req_body + self._headers = req_headers or {} + self._on_success = on_success + self._on_failure = on_failure + self._on_error = on_error + self._on_redirect = on_redirect + self._on_progress = on_progress + + # Start the request in a separate thread so callbacks fire “asynchronously” + thread = threading.Thread(target=self._run) + thread.daemon = True + thread.start() + + def _run(self): + try: + # We do not implement chunked progress or true redirects: + # we simply make one HTTP call, parse JSON, and call on_success or on_failure + resp = requests.request( + self._method, self._url, headers=self._headers, data=self._body, allow_redirects=False + ) + + # If you want to support on_redirect, check 3xx codes: + if 300 <= resp.status_code < 400 and self._on_redirect: + self._on_redirect(self, resp.json() if self._can_parse_json(resp) else resp.text) + return + + # If status_code is 200–299, treat as success, else failure + if 200 <= resp.status_code < 300: + parsed = resp.json() if self._can_parse_json(resp) else resp.text + if self._on_success: + self._on_success(self, parsed) + else: + parsed = resp.json() if self._can_parse_json(resp) else resp.text + if self._on_failure: + self._on_failure(self, parsed) + + except Exception as e: + # Any exception (network error, JSON parse error, etc.) → on_error + if self._on_error: + self._on_error(self, e) + + def _can_parse_json(self, resp): + ct = resp.headers.get("Content-Type", "") + return "application/json" in ct + + @property + def _resp_headers(self): + # Not implemented; for backward compatibility you could store resp.headers + return {} + + @property + def result(self): + # You could cache the JSON or .text before calling callbacks if you want + return None + + @property + def status_code(self): + return None From 5508245ca2d1fe4e195049528554df0d658d8ccc Mon Sep 17 00:00:00 2001 From: Brent Picasso Date: Tue, 8 Jul 2025 07:05:55 -0700 Subject: [PATCH 4/7] update version to 1.0.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0ae027d..5492de7 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="podium_api", packages=["podium_api", "podium_api.types"], - version="0.2.0", + version="1.0.0", description="API for Podium Motorsports Platform http://podium.live", author="Autosport Labs", author_email="sales@autosportlabs.com", From be3dfb244960d71e9792079e257c4b15495b5561 Mon Sep 17 00:00:00 2001 From: Brent Picasso Date: Sun, 20 Jul 2025 15:30:24 -0700 Subject: [PATCH 5/7] refine example to request sensordata for each eventdevice --- examples/test_livestream.py | 144 ++++++++++++++++++++++++++------ examples/test_livestream_gui.py | 95 --------------------- 2 files changed, 118 insertions(+), 121 deletions(-) delete mode 100644 examples/test_livestream_gui.py diff --git a/examples/test_livestream.py b/examples/test_livestream.py index a1bd6ef..2e84a2e 100644 --- a/examples/test_livestream.py +++ b/examples/test_livestream.py @@ -1,14 +1,45 @@ +# Demo to access livestream data for the user's specified event +# +# How to run +# python test_livestream.py --event_name= --app_id= --api_uri= --ws_uri= --username= --password= +# +# +# Operation flow +# +# Login +# Fetch my most recent events +# Find specific event matching specified name +# Get eventdevices for the event +# request live sensor data for each eventdevice in event +# print out live sensor data for each eventdevice +# +# To test: +# * Ensure your device is not currently broadcast real-time data (to prevent it from auto-creating an ad-hoc event) +# * Create an event with a specified name - e.g. "my_event", with a date range that encompasses the current date (now) +# * Add your device to this event +# * Enable your device to broadcast real-time data +# * Launch this script with the appropriate parameters +# +# Note - If an ad-hoc event was already created, you can simply rename the event to your preferred name, and adjust the start / end +# times as necessary + import argparse from time import sleep from podium_api import register_podium_application +from podium_api.api import PodiumAPI from podium_api.asyncreq import get_json_header_token +from podium_api.events import make_events_get from podium_api.livestream import PodiumLivestream from podium_api.login import make_login_post +from podium_api.types.account import PodiumAccount +from podium_api.types.event import PodiumEvent +from podium_api.types.user import PodiumUser def parse_args(): parser = argparse.ArgumentParser(description="Parse app connection parameters.") + parser.add_argument("--event_name", required=True, help="Application ID") parser.add_argument("--app_id", required=True, help="Application ID") parser.add_argument("--api_uri", required=True, help="Application API URI") parser.add_argument("--ws_uri", required=True, help="WebSocket API URI") @@ -20,51 +51,112 @@ def parse_args(): args = parse_args() +class Connection(object): + def __init__(self, token, podium, account, user, livestream): + self.token = token + self.podium = podium + self.account = account + self.user = user + self.livestream = livestream + + class LivestreamTester: def __init__(self): - self._livestream = None - - def on_connection_open(self): - print("connection opened") - self.request_current_livestreams() + self.connection = None - def on_connection_close(self): - print("connection closed") + def _on_connection_open(self): + print("Websocket connection opened") - def on_list_telemetry_sessions(self, body): - print(f"telemetry sessions: {body}") + def _on_connection_close(self): + print("Websocket connection closed") - def start_livestream(self, token): - self._livestream = livestream = PodiumLivestream() - livestream.set_connection_open_listener(self.on_connection_open) - livestream.set_connection_close_listener(self.on_connection_close) - livestream.set_list_telemetry_sessions_listener(self.on_list_telemetry_sessions) + def _sensor_data_listener(self, data): + print(f"sensor data {data}") + def _start_livestream_connection(self, token: str): + print("Starting livestream connection") + livestream = PodiumLivestream() + livestream.set_connection_open_listener(self._on_connection_open) + livestream.set_connection_close_listener(self._on_connection_close) + livestream.set_sensor_data_listener(self._sensor_data_listener) header = get_json_header_token(token) livestream.open_connection(args.ws_uri, header) + return livestream - def login_success(self, token): - print(f"login success: {token}") - self.start_livestream(token) + def _init_connection(self, token: str): + def _success(account: PodiumAccount, user: PodiumUser): + print(f"Init connection success") + livestream = self._start_livestream_connection(token) + self.connection = Connection(token=token, podium=podium, account=account, user=user, livestream=livestream) - def login_failure(self, error_type, results, data): - print(f"login failed! error_type: {error_type}; results: {results}; data: {data}") + def _failure(error_type, results, data): + print(f"Podium API init failure! error_type: {error_type}; results: {results}; data: {data}") + + self._podium = podium = PodiumAPI(token) + podium.init_connection(success_callback=_success, failure_callback=_failure) def login(self): - register_podium_application(args.app_id, "", args.api_uri) + def _success(token: str): + print(f"Login success") + self._init_connection(token) + + def _failure(error_type, results, data): + print(f"login failed! error_type: {error_type}; results: {results}; data: {data}") - make_login_post( - args.username, args.password, success_callback=self.login_success, failure_callback=self.login_failure + register_podium_application(args.app_id, "", args.api_uri) + make_login_post(args.username, args.password, success_callback=_success, failure_callback=_failure) + + def get_event_for_name(self, event_name: str, callback): + uri = f"{args.api_uri}/api/v1/account/events" + + def _success(paged_response): + print(f"make_events_get success - got {len(paged_response.events)} results matching {event_name}") + for event in paged_response.events: + print(f"Picking the first matched event ({event.title})") + callback(event) + + def _failure(error_type, results, data): + print(f"Events failed! error_type: {error_type}; results: {results}; data: {data}") + + make_events_get( + self.connection.token, + endpoint=uri, + success_callback=_success, + failure_callback=_failure, + title=event_name, ) - def request_current_livestreams(self): - if self._livestream: - self._livestream.list_telemetry_sessions() + def register_sensordata_for_event(self, event: PodiumEvent): + def _success(paged_response): + print(f"Got eventdevices for event {event.title}") + for eventdevice in paged_response.eventdevices: + eventdevice_id = eventdevice.eventdevice_id + print(f"Registering sensor data for eventdevice_id {eventdevice_id}") + self.connection.livestream.register_sensor_data(eventdevice_id) + + def _failure(error_type, results, data): + print(f"Eventdevices failed! error_type: {error_type}; results: {results}; data: {data}") + + self.connection.podium.eventdevices.list( + endpoint=event.devices_uri, + success_callback=_success, + failure_callback=_failure, + ) if __name__ == "__main__": + + def got_event_callback(event: PodiumEvent): + print(f"Got event {event.title}, registering sensordata for each eventdevice") + tester.register_sensordata_for_event(event) + tester = LivestreamTester() tester.login() + + # wait for connection initialization + while not tester.connection: + sleep(1) + tester.get_event_for_name(args.event_name, got_event_callback) + while True: - tester.request_current_livestreams() sleep(1) diff --git a/examples/test_livestream_gui.py b/examples/test_livestream_gui.py deleted file mode 100644 index 622ae39..0000000 --- a/examples/test_livestream_gui.py +++ /dev/null @@ -1,95 +0,0 @@ -import argparse -from time import sleep - -from kivy.app import App -from kivy.lang import Builder -from kivy.network.urlrequest import UrlRequest as KivyUrlRequest -from kivy.uix.boxlayout import BoxLayout - -from podium_api import register_podium_application -from podium_api.asyncreq import get_json_header_token, set_urlrequest_class -from podium_api.livestream import PodiumLivestream -from podium_api.login import make_login_post - - -def parse_args(): - parser = argparse.ArgumentParser(description="Parse app connection parameters.") - parser.add_argument("--app_id", required=True, help="Application ID") - parser.add_argument("--api_uri", required=True, help="Application API URI") - parser.add_argument("--ws_uri", required=True, help="WebSocket API URI") - parser.add_argument("--username", required=True, help="Username") - parser.add_argument("--password", required=True, help="Password") - return parser.parse_args() - - -args = parse_args() - -KV = """ -: - orientation: 'vertical' - spacing: 10 - padding: 10 - - TextInput: - id: text_box - text: "" - multiline: True - size_hint_y: 0.9 - - Button: - text: "Start" - size_hint_y: 0.1 - on_press: app.on_button_click() -""" - - -class MyWidget(BoxLayout): - pass - - -class MyApp(App): - def build(self): - Builder.load_string(KV) - return MyWidget() - - def on_connection_open(self): - print("connection opened") - self._livestream.list_telemetry_sessions() - - def on_connection_close(self): - print("connection closed") - - def on_list_telemetry_sessions(self, body): - print(f"telemetry sessions: {body}") - - def start_livestream(self, token): - self._livestream = livestream = PodiumLivestream() - livestream.set_connection_open_listener(self.on_connection_open) - livestream.set_connection_close_listener(self.on_connection_close) - livestream.set_list_telemetry_sessions_listener(self.on_list_telemetry_sessions) - - header = get_json_header_token(token) - livestream.open_connection(args.ws_uri, header) - - def login_success(self, token): - print(f"login success: {token}") - self.start_livestream(token) - - def login_failure(self, error_type, results, data): - print(f"login failed! error_type: {error_type}; results: {results}; data: {data}") - - def login(self): - # needed for Kivy's eventing model when handling UrlRequests - set_urlrequest_class(KivyUrlRequest) - register_podium_application(args.app_id, "", args.api_uri) - - make_login_post( - args.username, args.password, success_callback=self.login_success, failure_callback=self.login_failure - ) - - def on_button_click(self): - self.login() - - -if __name__ == "__main__": - MyApp().run() From 682d75d0c1523af96a5da9e4925135fe74f6a8b9 Mon Sep 17 00:00:00 2001 From: Brent Picasso Date: Sun, 20 Jul 2025 15:33:50 -0700 Subject: [PATCH 6/7] fix grammar --- examples/test_livestream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/test_livestream.py b/examples/test_livestream.py index 2e84a2e..143f55c 100644 --- a/examples/test_livestream.py +++ b/examples/test_livestream.py @@ -14,8 +14,8 @@ # print out live sensor data for each eventdevice # # To test: -# * Ensure your device is not currently broadcast real-time data (to prevent it from auto-creating an ad-hoc event) -# * Create an event with a specified name - e.g. "my_event", with a date range that encompasses the current date (now) +# * Ensure your device is not currently broadcasting real-time data (to prevent it from auto-creating an ad-hoc event) +# * Create an event with a specified name - e.g. "my_event", with a date range that encompasses the current date # * Add your device to this event # * Enable your device to broadcast real-time data # * Launch this script with the appropriate parameters From 8a89a20345a573c9ce6d87692029e01ba6ed966e Mon Sep 17 00:00:00 2001 From: Brent Picasso Date: Sun, 20 Jul 2025 15:35:04 -0700 Subject: [PATCH 7/7] fix bug in sensor data listener --- podium_api/livestream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/podium_api/livestream.py b/podium_api/livestream.py index 9fd1bd2..dafd2a4 100644 --- a/podium_api/livestream.py +++ b/podium_api/livestream.py @@ -43,7 +43,7 @@ def set_list_telemetry_sessions_listener(self, listener): self._list_telemetry_sessions_listener = listener def set_sensor_data_listener(self, listener): - self._list_telemetry_sessions_listener = listener + self._sensor_data_listener = listener def set_connection_open_listener(self, listener): self._connection_open_listener = listener