Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Toga GTK Camera API #3050

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/reference/api/hardware/camera.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ invoking any camera API. If permission has not yet been granted, the platform *m
request access at the time of first camera access; however, this is not guaranteed to be
the behavior on all platforms.

System requirements
-------------------

The Camera API on Linux uses the XDG Camera portal, even if the application
is running outside of Flatpak. Linux users must install a XDG Desktop Portal
implementation. Most desktop environments provide an implementation and
many popular distros will work out-of-the-box.

Additionally, GStreamer and WirePlumber must be installed, along with their
GObject introspection type libraries.

``TODO``

Notes
-----

Expand Down
20 changes: 20 additions & 0 deletions gtk/src/toga_gtk/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import signal

from jeepney.io.asyncio import DBusRouter, open_dbus_router

from toga.app import App as toga_App
from toga.command import Separator

Expand Down Expand Up @@ -60,6 +62,20 @@ def gtk_startup(self, data=None):
Gdk.Screen.get_default(), css_provider, Gtk.STYLE_PROVIDER_PRIORITY_USER
)

_dbus_router: None | DBusRouter = None

async def dbus_router(self):
"""Retrieve the ``DBusRouter`` to use for D-Bus communication.

This method caches the ``DBusRouter`` to avoid creating multiple concurrent
connections when. The router must be manually closed, which is handled in
:ref:`~.App.exit`."""
if self._dbus_router is None:
self._dbus_router = open_dbus_router()
await self._dbus_router.__aenter__()

return self._dbus_router

######################################################################
# Commands and menus
######################################################################
Expand Down Expand Up @@ -146,6 +162,10 @@ def create_menus(self):
def exit(self): # pragma: no cover
self.native.quit()

# Is this the right way to shut down?
if self._dbus_router is not None:
asyncio.create_task(self._dbus_router.__aexit__(None, None, None))

def main_loop(self):
# Modify signal handlers to make sure Ctrl-C is caught and handled.
signal.signal(signal.SIGINT, signal.SIG_DFL)
Expand Down
2 changes: 2 additions & 0 deletions gtk/src/toga_gtk/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .app import App
from .command import Command
from .fonts import Font
from .hardware.camera import Camera
from .hardware.location import Location
from .icons import Icon
from .images import Image
Expand Down Expand Up @@ -50,6 +51,7 @@ def not_implemented(feature):
"Paths",
"dialogs",
# Hardware
"Camera",
"Location",
# Status icons
"MenuStatusIcon",
Expand Down
203 changes: 203 additions & 0 deletions gtk/src/toga_gtk/hardware/camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from __future__ import annotations

import asyncio

from toga_gtk.libs import FeatureRequiresMissingDependency, Gio, GLib, Gst, Wp


class CameraDevice:
pass


class Camera:
def __init__(self, interface):
if Gst is None:
# CI (where coverage is enforced) must always have GStreamer available
# in order to perform the rest of the tests
raise FeatureRequiresMissingDependency(
"camera", "GStreamer", "hardware/camera.html#system-requirements"
) # pragma: no cover

if Wp is None:
# CI (where coverage is enforced) must always have WirePlumber available
# in order to perform the rest of the tests
raise FeatureRequiresMissingDependency(
"camera", "WirePlumber", "hardware/camera.html#system-requirements"
) # pragma: no cover

Gst.init(None)
Wp.init(Wp.InitFlags.PIPEWIRE)

self.interface = interface

self.permission_result: None | bool = None

_handle_token_count = 0

def _get_handle_token(self):
self._handle_token_count += 1
return str(self._handle_token_count)

def has_permission(self):
return bool(self.permission_result)

def _create_portal_proxy(self) -> asyncio.Future[Gio.DBusProxy]:
future = asyncio.Future()

def finish(_, task, *__):
try:
portal_proxy = Gio.DBusProxy.new_for_bus_finish(task)
except Exception as e:
future.set_exception(e)
else:
future.set_result(portal_proxy)

Gio.DBusProxy.new_for_bus(
bus_type=Gio.BusType.SESSION,
flags=Gio.DBusProxyFlags.NONE,
info=None,
name="org.freedesktop.portal.Desktop",
object_path="/org/freedesktop/portal/desktop",
interface_name="org.freedesktop.portal.Camera",
cancellable=None,
callback=finish,
user_data=None,
)

return future

def _subscribe_to_access_response(
self, connection, request_path
) -> asyncio.Future[bool]:
future = asyncio.Future()

def callback(
connection,
sender_name,
object_path,
interface_name,
signal_name,
parameters,
*user_data,
):
# parameters will be "(ua{sv})", i.e., a tuple[int, dict]
unwrapped_response = parameters.get_child_value(0).get_uint32()
future.set_result(unwrapped_response)

connection.signal_subscribe(
sender="org.freedesktop.portal.Desktop",
interface_name="org.freedesktop.portal.Request",
member="Response",
object_path=request_path,
arg0=None,
flags=Gio.DBusSignalFlags.NONE,
callback=callback,
user_data=None,
)

return future

def _get_access_camera_request_handle(self, connection) -> tuple[str, str]:
name = connection.get_unique_name()[1:].replace(".", "_")
token = f"access_camera_{self._get_handle_token()}"

path = f"/org/freedesktop/portal/desktop/request/{name}/{token}"

return path, token

def _access_camera(self, portal, handle_token) -> asyncio.Future[str]:
future = asyncio.Future()

def result_handler(_, result, *__):
if isinstance(result, Exception):
future.set_exception(result)
else:
future.set_result(result)

portal.AccessCamera(
"(a{sv})",
{"handle_token": GLib.Variant("s", handle_token)},
result_handler=result_handler,
)

return future

async def _request_permission(self, future):
try:
self.portal = await self._get_portal_proxy()
connection = self.portal.get_connection()
request_path, handle_token = self._get_access_camera_request_handle(
connection
)

# Subscribe _before_ sending the request to prevent possibility of race
# conditions. See docs (linked below) for further details about proper
# handling of the portal ``Request``/``Response`` cycle
# https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html#description
access_response_future = self._subscribe_to_access_response(
connection, request_path
)

actual_path = await self._access_camera(self.portal, handle_token)
# XDG implementations < 0.9 will not use the standard request path.
# As such, if the actual path returned by AccessCamera differs from
# the one created above, then a new response subscription is needed and
# the potential race condition cannot be avoided.
# See XDG Request docs linked above for further details on this quirk
if actual_path != request_path:
access_response_future = self._subscribe_to_access_response(
connection, actual_path
)

access_response = await access_response_future

# https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html#org-freedesktop-portal-request-response
# 0 -> user allowed camera access
# 1 -> user denied access
# 2 -> something else happened (but not approval)
self.permission_result = access_response == 0
except Exception as e:
future.set_exception(e)
else:
# if self.permission_result:
# await self._populate_devices()

future.set_result(self.permission_result)

def request_permission(self, future):
asyncio.create_task(self._request_permission(future))

def _get_glib_main_context(self):
loop = asyncio.get_running_loop()
breakpoint()
return loop._context

# async def _populate_devices(self):
# fd = await self._open_pipe_wire_remote()
# main_context = self._get_glib_main_context()

# self.wp_obj_manager = Wp.ObjectManager.new()
# self.wp_core = Wp.Core.new(main_context, None, None)
# self.wp_core.connect()

def _open_pipe_wire_remote(self) -> asyncio.Future[GLib.DBus]:
future = asyncio.Future()

def result_handler(_, result, *__):
if isinstance(result, Exception):
future.set_exception(result)
else:
future.set_result(result)

self.portal.OpenPipeWireRemote(
"(a{sv})",
{},
result_handler=result_handler,
)

return future

def get_devices(self):
if not (self.has_permission and self.portal):
# cannot list devices without permission or if the portal is not initialised
return []
22 changes: 22 additions & 0 deletions gtk/src/toga_gtk/libs/gtk.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,25 @@
from gi.repository import Flatpak # noqa: F401
except (ImportError, ValueError): # pragma: no cover
Flatpak = None

try:
gi.require_version("Gst", "1.0")
from gi.repository import Gst # noqa: F401
except (ImportError, ValueError): # pragma: no cover
Gst = None

try:
gi.require_version("Wp", "0.5")
from gi.repository import Wp # noqa: F401
except (ImportError, ValueError): # pragma: no cover
Wp = None


class FeatureRequiresMissingDependency(RuntimeError):
def __init__(self, feature: str, library: str, docs_path: str):
docs_url = f"https://toga.readthedocs.io/en/stable/reference/api/{docs_path}"
super().__init__(
f"{feature.title()} requires the missing dependency {library}. Ensure "
f"that the system package providing {library} and its GTK bindings have "
f"been installed. See {docs_url} for details."
)
18 changes: 18 additions & 0 deletions gtk/tests_backend/hardware/camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from ..app import AppProbe


class CameraProbe(AppProbe):
# Linux cannot support this because permission must be
# requested before devices can be listed, and it's only possible
# to reach the "first use" if a device is already identified
request_permission_on_first_use = False

def __init__(self, monkeypatch, app_probe):
self._verify_dependencies()
super().__init__(app_probe.app)

def cleanup(self): ...

def _verify_dependencies(self): ...

def allow_permission(self): ...
2 changes: 1 addition & 1 deletion testbed/tests/hardware/test_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
@pytest.fixture(
params=list_probes(
"camera",
skip_platforms=("linux", "windows"),
skip_platforms=("windows",),
skip_unbundled=True,
)
)
Expand Down
Loading