# ABOUTME: fprintd D-Bus integration for fingerprint authentication. # ABOUTME: Provides FingerprintListener that runs async in the GLib mainloop. import logging from typing import Callable logger = logging.getLogger(__name__) import gi gi.require_version("Gio", "2.0") from gi.repository import Gio, GLib FPRINTD_BUS_NAME = "net.reactivated.Fprint" FPRINTD_MANAGER_PATH = "/net/reactivated/Fprint/Manager" FPRINTD_MANAGER_IFACE = "net.reactivated.Fprint.Manager" FPRINTD_DEVICE_IFACE = "net.reactivated.Fprint.Device" # Retry-able statuses (finger not read properly, try again) _RETRY_STATUSES = { "verify-swipe-too-short", "verify-finger-not-centered", "verify-remove-and-retry", "verify-retry-scan", } class FingerprintListener: """Listens for fingerprint verification events via fprintd D-Bus.""" def __init__(self) -> None: self._device_proxy: Gio.DBusProxy | None = None self._device_path: str | None = None self._signal_id: int | None = None self._running: bool = False self._on_success: Callable[[], None] | None = None self._on_failure: Callable[[], None] | None = None self._init_device() def _init_device(self) -> None: """Connect to fprintd and get the default device.""" try: manager = Gio.DBusProxy.new_for_bus_sync( Gio.BusType.SYSTEM, Gio.DBusProxyFlags.NONE, None, FPRINTD_BUS_NAME, FPRINTD_MANAGER_PATH, FPRINTD_MANAGER_IFACE, None, ) result = manager.GetDefaultDevice() if result: self._device_path = result self._device_proxy = Gio.DBusProxy.new_for_bus_sync( Gio.BusType.SYSTEM, Gio.DBusProxyFlags.NONE, None, FPRINTD_BUS_NAME, self._device_path, FPRINTD_DEVICE_IFACE, None, ) except GLib.Error: self._device_proxy = None self._device_path = None def is_available(self, username: str) -> bool: """Check if fprintd is running and the user has enrolled fingerprints.""" if not self._device_proxy: return False try: result = self._device_proxy.ListEnrolledFingers("(s)", username) return bool(result) except GLib.Error: return False def start( self, username: str, on_success: Callable[[], None], on_failure: Callable[[], None], ) -> None: """Start listening for fingerprint verification.""" if not self._device_proxy: return self._on_success = on_success self._on_failure = on_failure try: self._device_proxy.Claim("(s)", username) except GLib.Error as e: logger.error("Failed to claim fingerprint device: %s", e.message) return # Connect to the VerifyStatus signal self._signal_id = self._device_proxy.connect( "g-signal", self._on_signal ) try: self._device_proxy.VerifyStart("(s)", "any") except GLib.Error as e: logger.error("Failed to start fingerprint verification: %s", e.message) self._device_proxy.disconnect(self._signal_id) self._signal_id = None try: self._device_proxy.Release() except GLib.Error: pass return self._running = True def stop(self) -> None: """Stop listening and release the device.""" if not self._running: return self._running = False if self._device_proxy: if self._signal_id is not None: self._device_proxy.disconnect(self._signal_id) self._signal_id = None try: self._device_proxy.VerifyStop() except GLib.Error: pass try: self._device_proxy.Release() except GLib.Error: pass def _on_signal( self, proxy: Gio.DBusProxy, sender_name: str | None, signal_name: str, parameters: GLib.Variant, ) -> None: """Handle D-Bus signals from the fprintd device.""" if signal_name != "VerifyStatus": return status = parameters[0] done = parameters[1] self._on_verify_status(status, done) def _on_verify_status(self, status: str, done: bool) -> None: """Process a VerifyStatus signal from fprintd.""" if not self._running: return if status == "verify-match": if self._on_success: self._on_success() return if status in _RETRY_STATUSES: # Retry — finger wasn't read properly if done: self._device_proxy.VerifyStart("(s)", "any") return if status == "verify-no-match": if self._on_failure: self._on_failure() # Restart verification for another attempt if done: self._device_proxy.VerifyStart("(s)", "any") return