Add crash guard, logging, and defensive error handling to prevent lockout
Global sys.excepthook unlocks the session on unhandled exceptions. Structured logging to stderr and optional file at /var/cache/moonlock/. Window creation, CSS loading, and fingerprint start wrapped in try/except with automatic session unlock when all windows fail.
This commit is contained in:
parent
7cee4f4f8d
commit
3f31387305
@ -4,7 +4,7 @@
|
||||
# Maintainer: Dominik Kressler
|
||||
|
||||
pkgname=moonlock-git
|
||||
pkgver=0.1.1.r0.g22f725e
|
||||
pkgver=0.2.0.r0.g7cee4f4
|
||||
pkgrel=1
|
||||
pkgdesc="A secure Wayland lockscreen with GTK4, PAM and fingerprint support"
|
||||
arch=('any')
|
||||
|
||||
@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "moonlock"
|
||||
version = "0.2.0"
|
||||
version = "0.2.1"
|
||||
description = "A secure Wayland lockscreen with GTK4, PAM and fingerprint support"
|
||||
requires-python = ">=3.11"
|
||||
license = "MIT"
|
||||
|
||||
@ -1,8 +1,11 @@
|
||||
# ABOUTME: fprintd D-Bus integration for fingerprint authentication.
|
||||
# ABOUTME: Provides FingerprintListener that runs async in the GLib mainloop.
|
||||
|
||||
import logging
|
||||
from typing import Callable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
import gi
|
||||
gi.require_version("Gio", "2.0")
|
||||
from gi.repository import Gio, GLib
|
||||
@ -84,16 +87,31 @@ class FingerprintListener:
|
||||
|
||||
self._on_success = on_success
|
||||
self._on_failure = on_failure
|
||||
self._running = True
|
||||
|
||||
self._device_proxy.Claim("(s)", username)
|
||||
try:
|
||||
self._device_proxy.Claim("(s)", username)
|
||||
except GLib.Error as e:
|
||||
logger.error("Failed to claim fingerprint device: %s", e.message)
|
||||
return
|
||||
|
||||
# Connect to the VerifyStatus signal
|
||||
self._signal_id = self._device_proxy.connect(
|
||||
"g-signal", self._on_signal
|
||||
)
|
||||
|
||||
self._device_proxy.VerifyStart("(s)", "any")
|
||||
try:
|
||||
self._device_proxy.VerifyStart("(s)", "any")
|
||||
except GLib.Error as e:
|
||||
logger.error("Failed to start fingerprint verification: %s", e.message)
|
||||
self._device_proxy.disconnect(self._signal_id)
|
||||
self._signal_id = None
|
||||
try:
|
||||
self._device_proxy.Release()
|
||||
except GLib.Error:
|
||||
pass
|
||||
return
|
||||
|
||||
self._running = True
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop listening and release the device."""
|
||||
|
||||
@ -1,14 +1,21 @@
|
||||
# ABOUTME: Entry point for Moonlock — sets up GTK Application and ext-session-lock-v1.
|
||||
# ABOUTME: Handles CLI invocation, session locking, and multi-monitor support.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from importlib.resources import files
|
||||
from pathlib import Path
|
||||
|
||||
# gtk4-layer-shell must be loaded before libwayland-client
|
||||
_LAYER_SHELL_LIB = "/usr/lib/libgtk4-layer-shell.so"
|
||||
_existing_preload = os.environ.get("LD_PRELOAD", "")
|
||||
if _LAYER_SHELL_LIB not in _existing_preload and os.path.exists(_LAYER_SHELL_LIB):
|
||||
_is_testing = "pytest" in sys.modules or "unittest" in sys.modules
|
||||
if (
|
||||
not _is_testing
|
||||
and _LAYER_SHELL_LIB not in _existing_preload
|
||||
and os.path.exists(_LAYER_SHELL_LIB)
|
||||
):
|
||||
os.environ["LD_PRELOAD"] = f"{_existing_preload}:{_LAYER_SHELL_LIB}".lstrip(":")
|
||||
os.execvp(sys.executable, [sys.executable, "-m", "moonlock.main"] + sys.argv[1:])
|
||||
|
||||
@ -21,6 +28,8 @@ from moonlock.config import load_config
|
||||
from moonlock.fingerprint import FingerprintListener
|
||||
from moonlock.lockscreen import LockscreenWindow
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ext-session-lock-v1 via gtk4-layer-shell
|
||||
try:
|
||||
gi.require_version("Gtk4SessionLock", "1.0")
|
||||
@ -29,6 +38,33 @@ try:
|
||||
except (ValueError, ImportError):
|
||||
HAS_SESSION_LOCK = False
|
||||
|
||||
_LOG_DIR = Path("/var/cache/moonlock")
|
||||
_LOG_FILE = _LOG_DIR / "moonlock.log"
|
||||
|
||||
|
||||
def _setup_logging() -> None:
|
||||
"""Configure logging to stderr and optionally to a log file."""
|
||||
root = logging.getLogger()
|
||||
root.setLevel(logging.INFO)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s %(levelname)s %(name)s: %(message)s"
|
||||
)
|
||||
|
||||
stderr_handler = logging.StreamHandler(sys.stderr)
|
||||
stderr_handler.setLevel(logging.INFO)
|
||||
stderr_handler.setFormatter(formatter)
|
||||
root.addHandler(stderr_handler)
|
||||
|
||||
if _LOG_DIR.is_dir():
|
||||
try:
|
||||
file_handler = logging.FileHandler(_LOG_FILE)
|
||||
file_handler.setLevel(logging.INFO)
|
||||
file_handler.setFormatter(formatter)
|
||||
root.addHandler(file_handler)
|
||||
except PermissionError:
|
||||
logger.warning("Cannot write to %s", _LOG_FILE)
|
||||
|
||||
|
||||
class MoonlockApp(Gtk.Application):
|
||||
"""GTK Application for the Moonlock lockscreen."""
|
||||
@ -62,15 +98,22 @@ class MoonlockApp(Gtk.Application):
|
||||
|
||||
for i in range(monitors.get_n_items()):
|
||||
monitor = monitors.get_item(i)
|
||||
window = LockscreenWindow(
|
||||
application=self,
|
||||
unlock_callback=self._unlock,
|
||||
config=self._config,
|
||||
fingerprint_listener=fp_listener,
|
||||
)
|
||||
self._lock_instance.assign_window_to_monitor(window, monitor)
|
||||
window.present()
|
||||
self._windows.append(window)
|
||||
try:
|
||||
window = LockscreenWindow(
|
||||
application=self,
|
||||
unlock_callback=self._unlock,
|
||||
config=self._config,
|
||||
fingerprint_listener=fp_listener,
|
||||
)
|
||||
self._lock_instance.assign_window_to_monitor(window, monitor)
|
||||
window.present()
|
||||
self._windows.append(window)
|
||||
except Exception:
|
||||
logger.exception("Failed to create lockscreen window for monitor %d", i)
|
||||
|
||||
if not self._windows:
|
||||
logger.critical("No lockscreen windows created — unlocking session to prevent lockout")
|
||||
self._lock_instance.unlock()
|
||||
|
||||
def _activate_without_lock(self) -> None:
|
||||
"""Fallback for development — no session lock, just a window."""
|
||||
@ -90,19 +133,41 @@ class MoonlockApp(Gtk.Application):
|
||||
|
||||
def _load_css(self) -> None:
|
||||
"""Load the CSS stylesheet for the lockscreen."""
|
||||
css_provider = Gtk.CssProvider()
|
||||
css_path = files("moonlock") / "style.css"
|
||||
css_provider.load_from_path(str(css_path))
|
||||
Gtk.StyleContext.add_provider_for_display(
|
||||
Gdk.Display.get_default(),
|
||||
css_provider,
|
||||
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
|
||||
)
|
||||
try:
|
||||
css_provider = Gtk.CssProvider()
|
||||
css_path = files("moonlock") / "style.css"
|
||||
css_provider.load_from_path(str(css_path))
|
||||
Gtk.StyleContext.add_provider_for_display(
|
||||
Gdk.Display.get_default(),
|
||||
css_provider,
|
||||
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to load CSS stylesheet")
|
||||
|
||||
|
||||
def _install_crash_guard(app: MoonlockApp) -> None:
|
||||
"""Install a global exception handler that unlocks the session on crash."""
|
||||
_original_excepthook = sys.excepthook
|
||||
|
||||
def _crash_guard(exc_type, exc_value, exc_tb):
|
||||
logger.critical("Unhandled exception — unlocking session to prevent lockout", exc_info=(exc_type, exc_value, exc_tb))
|
||||
if app._lock_instance:
|
||||
try:
|
||||
app._lock_instance.unlock()
|
||||
except Exception:
|
||||
pass
|
||||
_original_excepthook(exc_type, exc_value, exc_tb)
|
||||
|
||||
sys.excepthook = _crash_guard
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Run the Moonlock application."""
|
||||
_setup_logging()
|
||||
logger.info("Moonlock starting")
|
||||
app = MoonlockApp()
|
||||
_install_crash_guard(app)
|
||||
app.run(sys.argv)
|
||||
|
||||
|
||||
|
||||
@ -150,3 +150,82 @@ class TestFingerprintSignalHandling:
|
||||
on_failure.assert_not_called()
|
||||
# Should NOT restart — verification still in progress
|
||||
listener._device_proxy.VerifyStart.assert_not_called()
|
||||
|
||||
|
||||
class TestFingerprintStartErrorHandling:
|
||||
"""Tests for GLib.Error handling in start()."""
|
||||
|
||||
def test_claim_glib_error_logs_and_returns_without_starting(self):
|
||||
"""When Claim() raises GLib.Error, start() should not proceed."""
|
||||
from gi.repository import GLib
|
||||
|
||||
listener = FingerprintListener.__new__(FingerprintListener)
|
||||
listener._device_proxy = MagicMock()
|
||||
listener._device_path = "/dev/0"
|
||||
listener._signal_id = None
|
||||
listener._running = False
|
||||
listener._on_success = None
|
||||
listener._on_failure = None
|
||||
|
||||
listener._device_proxy.Claim.side_effect = GLib.Error(
|
||||
"net.reactivated.Fprint.Error.AlreadyClaimed"
|
||||
)
|
||||
|
||||
on_success = MagicMock()
|
||||
on_failure = MagicMock()
|
||||
|
||||
listener.start("testuser", on_success=on_success, on_failure=on_failure)
|
||||
|
||||
# Should NOT have connected signals or started verification
|
||||
listener._device_proxy.connect.assert_not_called()
|
||||
listener._device_proxy.VerifyStart.assert_not_called()
|
||||
assert listener._running is False
|
||||
|
||||
def test_verify_start_glib_error_disconnects_and_releases(self):
|
||||
"""When VerifyStart() raises GLib.Error, start() should clean up."""
|
||||
from gi.repository import GLib
|
||||
|
||||
listener = FingerprintListener.__new__(FingerprintListener)
|
||||
listener._device_proxy = MagicMock()
|
||||
listener._device_path = "/dev/0"
|
||||
listener._signal_id = None
|
||||
listener._running = False
|
||||
listener._on_success = None
|
||||
listener._on_failure = None
|
||||
|
||||
# Claim succeeds, signal connect returns an ID, VerifyStart fails
|
||||
listener._device_proxy.connect.return_value = 99
|
||||
listener._device_proxy.VerifyStart.side_effect = GLib.Error(
|
||||
"net.reactivated.Fprint.Error.Internal"
|
||||
)
|
||||
|
||||
on_success = MagicMock()
|
||||
on_failure = MagicMock()
|
||||
|
||||
listener.start("testuser", on_success=on_success, on_failure=on_failure)
|
||||
|
||||
# Should have disconnected the signal
|
||||
listener._device_proxy.disconnect.assert_called_once_with(99)
|
||||
# Should have released the device
|
||||
listener._device_proxy.Release.assert_called_once()
|
||||
assert listener._running is False
|
||||
assert listener._signal_id is None
|
||||
|
||||
def test_start_sets_running_true_only_on_success(self):
|
||||
"""_running should only be True after both Claim and VerifyStart succeed."""
|
||||
listener = FingerprintListener.__new__(FingerprintListener)
|
||||
listener._device_proxy = MagicMock()
|
||||
listener._device_path = "/dev/0"
|
||||
listener._signal_id = None
|
||||
listener._running = False
|
||||
listener._on_success = None
|
||||
listener._on_failure = None
|
||||
|
||||
listener._device_proxy.connect.return_value = 42
|
||||
|
||||
on_success = MagicMock()
|
||||
on_failure = MagicMock()
|
||||
|
||||
listener.start("testuser", on_success=on_success, on_failure=on_failure)
|
||||
|
||||
assert listener._running is True
|
||||
|
||||
242
tests/test_main.py
Normal file
242
tests/test_main.py
Normal file
@ -0,0 +1,242 @@
|
||||
# ABOUTME: Tests for the Moonlock application entry point.
|
||||
# ABOUTME: Covers logging setup, defensive window creation, and CSS error handling.
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _mock_gtk(monkeypatch):
|
||||
"""Prevent GTK from requiring a display during test collection."""
|
||||
mock_gi = MagicMock()
|
||||
mock_gtk = MagicMock()
|
||||
mock_gdk = MagicMock()
|
||||
mock_session_lock = MagicMock()
|
||||
|
||||
# Pre-populate gi.repository with our mocks
|
||||
modules = {
|
||||
"gi": mock_gi,
|
||||
"gi.repository": MagicMock(Gtk=mock_gtk, Gdk=mock_gdk, Gtk4SessionLock=mock_session_lock),
|
||||
}
|
||||
|
||||
with monkeypatch.context() as m:
|
||||
# Only patch missing/problematic modules if not already loaded
|
||||
for mod_name, mod in modules.items():
|
||||
if mod_name not in sys.modules:
|
||||
m.setitem(sys.modules, mod_name, mod)
|
||||
yield
|
||||
|
||||
|
||||
def _import_main():
|
||||
"""Import main module lazily after GTK mocking is in place."""
|
||||
from moonlock.main import MoonlockApp, _setup_logging
|
||||
return MoonlockApp, _setup_logging
|
||||
|
||||
|
||||
class TestSetupLogging:
|
||||
"""Tests for the logging infrastructure."""
|
||||
|
||||
def test_setup_logging_adds_stderr_handler(self):
|
||||
"""_setup_logging() should add a StreamHandler to the root logger."""
|
||||
_, _setup_logging = _import_main()
|
||||
root = logging.getLogger()
|
||||
handlers_before = len(root.handlers)
|
||||
|
||||
_setup_logging()
|
||||
|
||||
assert len(root.handlers) > handlers_before
|
||||
|
||||
# Clean up handlers we added
|
||||
for handler in root.handlers[handlers_before:]:
|
||||
root.removeHandler(handler)
|
||||
|
||||
def test_setup_logging_sets_info_level(self):
|
||||
"""_setup_logging() should set root logger to INFO level."""
|
||||
_, _setup_logging = _import_main()
|
||||
root = logging.getLogger()
|
||||
original_level = root.level
|
||||
|
||||
_setup_logging()
|
||||
|
||||
assert root.level == logging.INFO
|
||||
|
||||
# Restore
|
||||
root.setLevel(original_level)
|
||||
for handler in root.handlers[:]:
|
||||
root.removeHandler(handler)
|
||||
|
||||
@patch("moonlock.main._LOG_DIR")
|
||||
@patch("logging.FileHandler")
|
||||
def test_setup_logging_adds_file_handler_when_dir_exists(self, mock_fh, mock_log_dir):
|
||||
"""_setup_logging() should add a FileHandler when log directory exists."""
|
||||
_, _setup_logging = _import_main()
|
||||
mock_log_dir.is_dir.return_value = True
|
||||
|
||||
root = logging.getLogger()
|
||||
handlers_before = len(root.handlers)
|
||||
|
||||
_setup_logging()
|
||||
|
||||
mock_fh.assert_called_once()
|
||||
|
||||
# Clean up
|
||||
for handler in root.handlers[handlers_before:]:
|
||||
root.removeHandler(handler)
|
||||
|
||||
@patch("moonlock.main._LOG_DIR")
|
||||
def test_setup_logging_skips_file_handler_when_dir_missing(self, mock_log_dir):
|
||||
"""_setup_logging() should not fail when log directory doesn't exist."""
|
||||
_, _setup_logging = _import_main()
|
||||
mock_log_dir.is_dir.return_value = False
|
||||
|
||||
root = logging.getLogger()
|
||||
handlers_before = len(root.handlers)
|
||||
|
||||
_setup_logging()
|
||||
|
||||
assert len(root.handlers) >= handlers_before
|
||||
|
||||
# Clean up
|
||||
for handler in root.handlers[handlers_before:]:
|
||||
root.removeHandler(handler)
|
||||
|
||||
|
||||
class TestCssErrorHandling:
|
||||
"""Tests for CSS loading error handling."""
|
||||
|
||||
@patch("moonlock.main.Gdk.Display.get_default")
|
||||
@patch("moonlock.main.Gtk.CssProvider")
|
||||
@patch("moonlock.main.files")
|
||||
def test_load_css_logs_error_on_exception(self, mock_files, mock_css_cls, mock_display):
|
||||
"""CSS loading errors should be logged, not raised."""
|
||||
MoonlockApp, _ = _import_main()
|
||||
mock_files.return_value.__truediv__ = MagicMock(return_value=Path("/nonexistent"))
|
||||
mock_css = MagicMock()
|
||||
mock_css.load_from_path.side_effect = Exception("CSS parse error")
|
||||
mock_css_cls.return_value = mock_css
|
||||
|
||||
app = MoonlockApp.__new__(MoonlockApp)
|
||||
app._config = MagicMock()
|
||||
|
||||
# Should not raise
|
||||
with patch("moonlock.main.logger") as mock_logger:
|
||||
app._load_css()
|
||||
mock_logger.exception.assert_called_once()
|
||||
|
||||
|
||||
class TestDefensiveWindowCreation:
|
||||
"""Tests for defensive window creation in session lock mode."""
|
||||
|
||||
@patch("moonlock.main.LockscreenWindow")
|
||||
@patch("moonlock.main.FingerprintListener")
|
||||
@patch("moonlock.main.Gdk.Display.get_default")
|
||||
@patch("moonlock.main.Gtk4SessionLock")
|
||||
def test_single_window_failure_does_not_stop_other_windows(
|
||||
self, mock_session_lock, mock_display, mock_fp, mock_window_cls
|
||||
):
|
||||
"""If one window fails, others should still be created."""
|
||||
MoonlockApp, _ = _import_main()
|
||||
app = MoonlockApp.__new__(MoonlockApp)
|
||||
app._config = MagicMock()
|
||||
app._windows = []
|
||||
|
||||
# Two monitors
|
||||
monitor1 = MagicMock()
|
||||
monitor2 = MagicMock()
|
||||
monitors = MagicMock()
|
||||
monitors.get_n_items.return_value = 2
|
||||
monitors.get_item.side_effect = [monitor1, monitor2]
|
||||
mock_display.return_value.get_monitors.return_value = monitors
|
||||
|
||||
lock_instance = MagicMock()
|
||||
mock_session_lock.Instance.new.return_value = lock_instance
|
||||
app._lock_instance = lock_instance
|
||||
|
||||
# First window creation fails, second succeeds
|
||||
window_ok = MagicMock()
|
||||
mock_window_cls.side_effect = [Exception("GTK error"), window_ok]
|
||||
|
||||
with patch("moonlock.main.logger"):
|
||||
app._activate_with_session_lock()
|
||||
|
||||
# One window should have been created despite the first failure
|
||||
assert len(app._windows) == 1
|
||||
|
||||
@patch("moonlock.main.LockscreenWindow")
|
||||
@patch("moonlock.main.FingerprintListener")
|
||||
@patch("moonlock.main.Gdk.Display.get_default")
|
||||
@patch("moonlock.main.Gtk4SessionLock")
|
||||
def test_all_windows_fail_unlocks_session(
|
||||
self, mock_session_lock, mock_display, mock_fp, mock_window_cls
|
||||
):
|
||||
"""If ALL windows fail to create, session should be unlocked to prevent lockout."""
|
||||
MoonlockApp, _ = _import_main()
|
||||
app = MoonlockApp.__new__(MoonlockApp)
|
||||
app._config = MagicMock()
|
||||
app._windows = []
|
||||
|
||||
# One monitor
|
||||
monitors = MagicMock()
|
||||
monitors.get_n_items.return_value = 1
|
||||
monitors.get_item.return_value = MagicMock()
|
||||
mock_display.return_value.get_monitors.return_value = monitors
|
||||
|
||||
lock_instance = MagicMock()
|
||||
mock_session_lock.Instance.new.return_value = lock_instance
|
||||
app._lock_instance = lock_instance
|
||||
|
||||
# Window creation fails
|
||||
mock_window_cls.side_effect = Exception("GTK error")
|
||||
|
||||
with patch("moonlock.main.logger"):
|
||||
app._activate_with_session_lock()
|
||||
|
||||
# Session should have been unlocked to prevent permanent lockout
|
||||
lock_instance.unlock.assert_called_once()
|
||||
|
||||
|
||||
class TestCrashGuard:
|
||||
"""Tests for the global exception handler that prevents lockout on crash."""
|
||||
|
||||
def test_crash_guard_unlocks_session_on_unhandled_exception(self):
|
||||
"""sys.excepthook should unlock the session when an exception reaches it."""
|
||||
MoonlockApp, _ = _import_main()
|
||||
from moonlock.main import _install_crash_guard
|
||||
|
||||
app = MoonlockApp.__new__(MoonlockApp)
|
||||
app._lock_instance = MagicMock()
|
||||
|
||||
original_hook = sys.excepthook
|
||||
_install_crash_guard(app)
|
||||
|
||||
try:
|
||||
# Simulate an unhandled exception reaching the hook
|
||||
with patch("moonlock.main.logger"):
|
||||
sys.excepthook(RuntimeError, RuntimeError("segfault"), None)
|
||||
|
||||
app._lock_instance.unlock.assert_called_once()
|
||||
finally:
|
||||
sys.excepthook = original_hook
|
||||
|
||||
def test_crash_guard_survives_unlock_failure(self):
|
||||
"""Crash guard should not raise even if unlock() itself fails."""
|
||||
MoonlockApp, _ = _import_main()
|
||||
from moonlock.main import _install_crash_guard
|
||||
|
||||
app = MoonlockApp.__new__(MoonlockApp)
|
||||
app._lock_instance = MagicMock()
|
||||
app._lock_instance.unlock.side_effect = Exception("protocol error")
|
||||
|
||||
original_hook = sys.excepthook
|
||||
_install_crash_guard(app)
|
||||
|
||||
try:
|
||||
with patch("moonlock.main.logger"):
|
||||
# Should not raise despite unlock failure
|
||||
sys.excepthook(RuntimeError, RuntimeError("crash"), None)
|
||||
finally:
|
||||
sys.excepthook = original_hook
|
||||
Loading…
x
Reference in New Issue
Block a user