fix: Audit-Findings — Realize-Handler, Thread-Safety, Input-Validierung

- _on_realize implementiert (war nur connected, nicht definiert)
- do_unrealize für as_file() Context-Manager-Cleanup
- threading.Lock für _greetd_sock Race Condition
- TOML-Parsing-Fehler abfangen statt Crash
- last-user Datei: Längen- und Zeichenvalidierung
- detect_locale: non-alpha LANG-Werte abweisen
- exec_cmd Plausibility-Check mit shutil.which
- Exception-Details ins Log statt in die UI
- subprocess.run Timeout für Power-Actions
- Sequence[Path] statt tuple[Path, ...] in get_sessions
- Mock-Server _recvall für fragmentierte Reads
- [behavior]-Config-Sektion entfernt (unimplementiert)
- Design Decisions in CLAUDE.md dokumentiert
This commit is contained in:
nevaforget 2026-03-26 12:56:52 +01:00
parent 8b1608f99d
commit 4cd73a430b
11 changed files with 138 additions and 42 deletions

View File

@ -44,3 +44,8 @@ uv run moongreet
- `i18n.py` — Locale-Erkennung (LANG / /etc/locale.conf) und String-Tabellen (DE/EN) - `i18n.py` — Locale-Erkennung (LANG / /etc/locale.conf) und String-Tabellen (DE/EN)
- `greeter.py` — GTK4 UI (Overlay-Layout), Faillock-Warnung nach 2 Fehlversuchen - `greeter.py` — GTK4 UI (Overlay-Layout), Faillock-Warnung nach 2 Fehlversuchen
- `main.py` — Entry Point, GTK App, Layer Shell Setup - `main.py` — Entry Point, GTK App, Layer Shell Setup
## Design Decisions
- **Synchrones I/O im GTK-Konstruktor**: `load_config`, `load_strings`, `get_users` und `get_sessions` laufen synchron in `GreeterWindow.__init__`. Async Loading mit Placeholder-UI wäre möglich, erhöht aber die Komplexität erheblich. Der Greeter startet 1x pro Boot auf lokaler Hardware — die Daten sind klein (passwd, locale.conf, wenige .desktop-Files), die Latenz im Normalfall vernachlässigbar.
- **Synchrones Avatar-Decoding**: `GdkPixbuf.Pixbuf.new_from_file_at_scale` läuft synchron auf dem Main Thread. Bei großen Bildern als `.face`-Datei kann die UI kurz stocken. Der Avatar-Cache (`_avatar_cache`) federt das nach dem ersten Laden ab. Async Decoding per Worker-Thread + `GLib.idle_add` wäre die Alternative, rechtfertigt aber den Aufwand nicht für einen Single-User-Greeter.

View File

@ -4,7 +4,3 @@
[appearance] [appearance]
# Absolute path to wallpaper image # Absolute path to wallpaper image
background = "/usr/share/backgrounds/wallpaper.jpg" background = "/usr/share/backgrounds/wallpaper.jpg"
[behavior]
# show_user_list = true
# default_session = "Hyprland"

View File

@ -33,8 +33,11 @@ def load_config(config_path: Path | None = None) -> Config:
if not config_path.exists(): if not config_path.exists():
return Config() return Config()
with open(config_path, "rb") as f: try:
data = tomllib.load(f) with open(config_path, "rb") as f:
data = tomllib.load(f)
except (tomllib.TOMLDecodeError, OSError):
return Config()
config = Config() config = Config()
appearance = data.get("appearance", {}) appearance = data.get("appearance", {})

View File

@ -1,8 +1,11 @@
# ABOUTME: Main greeter window — builds the GTK4 UI layout for the Moongreet greeter. # ABOUTME: Main greeter window — builds the GTK4 UI layout for the Moongreet greeter.
# ABOUTME: Handles user selection, session choice, password entry, and power actions. # ABOUTME: Handles user selection, session choice, password entry, and power actions.
import logging
import os import os
import re
import shlex import shlex
import shutil
import socket import socket
import stat import stat
import subprocess import subprocess
@ -22,8 +25,12 @@ from moongreet.users import User, get_users, get_avatar_path, get_user_gtk_theme
from moongreet.sessions import Session, get_sessions from moongreet.sessions import Session, get_sessions
from moongreet.power import reboot, shutdown from moongreet.power import reboot, shutdown
logger = logging.getLogger(__name__)
LAST_USER_PATH = Path("/var/cache/moongreet/last-user") LAST_USER_PATH = Path("/var/cache/moongreet/last-user")
FAILLOCK_MAX_ATTEMPTS = 3 FAILLOCK_MAX_ATTEMPTS = 3
VALID_USERNAME = re.compile(r"^[a-zA-Z0-9_.-]+$")
MAX_USERNAME_LENGTH = 256
PACKAGE_DATA = files("moongreet") / "data" PACKAGE_DATA = files("moongreet") / "data"
DEFAULT_AVATAR_PATH = PACKAGE_DATA / "default-avatar.svg" DEFAULT_AVATAR_PATH = PACKAGE_DATA / "default-avatar.svg"
DEFAULT_WALLPAPER_PATH = PACKAGE_DATA / "wallpaper.jpg" DEFAULT_WALLPAPER_PATH = PACKAGE_DATA / "wallpaper.jpg"
@ -56,13 +63,33 @@ class GreeterWindow(Gtk.ApplicationWindow):
self._sessions = get_sessions() self._sessions = get_sessions()
self._selected_user: User | None = None self._selected_user: User | None = None
self._greetd_sock: socket.socket | None = None self._greetd_sock: socket.socket | None = None
self._greetd_sock_lock = threading.Lock()
self._default_avatar_pixbuf: GdkPixbuf.Pixbuf | None = None self._default_avatar_pixbuf: GdkPixbuf.Pixbuf | None = None
self._avatar_cache: dict[str, GdkPixbuf.Pixbuf] = {} self._avatar_cache: dict[str, GdkPixbuf.Pixbuf] = {}
self._failed_attempts: dict[str, int] = {} self._failed_attempts: dict[str, int] = {}
self._wallpaper_ctx = None
self._build_ui() self._build_ui()
self._select_initial_user()
self._setup_keyboard_navigation() self._setup_keyboard_navigation()
# Defer initial user selection until the window is realized,
# so get_color() returns the actual theme foreground for SVG tinting
self.connect("realize", self._on_realize)
def _on_realize(self, widget: Gtk.Widget) -> None:
"""Called when the window is realized — select initial user.
Deferred from __init__ so get_color() returns actual theme values
for SVG tinting. Uses idle_add so the first frame renders before
avatar loading blocks the main loop.
"""
GLib.idle_add(self._select_initial_user)
def do_unrealize(self) -> None:
"""Clean up resources when the window is unrealized."""
if self._wallpaper_ctx is not None:
self._wallpaper_ctx.__exit__(None, None, None)
self._wallpaper_ctx = None
super().do_unrealize()
def _build_ui(self) -> None: def _build_ui(self) -> None:
"""Build the complete greeter UI layout.""" """Build the complete greeter UI layout."""
@ -375,12 +402,13 @@ class GreeterWindow(Gtk.ApplicationWindow):
def _close_greetd_sock(self) -> None: def _close_greetd_sock(self) -> None:
"""Close the greetd socket and reset the reference.""" """Close the greetd socket and reset the reference."""
if self._greetd_sock: with self._greetd_sock_lock:
try: if self._greetd_sock:
self._greetd_sock.close() try:
except OSError: self._greetd_sock.close()
pass except OSError:
self._greetd_sock = None pass
self._greetd_sock = None
def _set_login_sensitive(self, sensitive: bool) -> None: def _set_login_sensitive(self, sensitive: bool) -> None:
"""Enable or disable login controls during authentication.""" """Enable or disable login controls during authentication."""
@ -412,7 +440,8 @@ class GreeterWindow(Gtk.ApplicationWindow):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(10.0) sock.settimeout(10.0)
sock.connect(sock_path) sock.connect(sock_path)
self._greetd_sock = sock with self._greetd_sock_lock:
self._greetd_sock = sock
# Step 1: Create session # Step 1: Create session
response = create_session(sock, user.username) response = create_session(sock, user.username)
@ -440,7 +469,7 @@ class GreeterWindow(Gtk.ApplicationWindow):
# Step 3: Start session # Step 3: Start session
if response.get("type") == "success": if response.get("type") == "success":
cmd = shlex.split(session.exec_cmd) cmd = shlex.split(session.exec_cmd)
if not cmd: if not cmd or not shutil.which(cmd[0]):
cancel_session(sock) cancel_session(sock)
GLib.idle_add(self._on_login_error, None, self._strings.invalid_session_command) GLib.idle_add(self._on_login_error, None, self._strings.invalid_session_command)
return return
@ -457,11 +486,13 @@ class GreeterWindow(Gtk.ApplicationWindow):
self._close_greetd_sock() self._close_greetd_sock()
except ConnectionError as e: except ConnectionError as e:
logger.error("greetd connection error: %s", e)
self._close_greetd_sock() self._close_greetd_sock()
GLib.idle_add(self._on_login_error, None, self._strings.connection_error.format(error=e)) GLib.idle_add(self._on_login_error, None, self._strings.connection_error)
except (OSError, ValueError) as e: except (OSError, ValueError) as e:
logger.error("greetd socket error: %s", e)
self._close_greetd_sock() self._close_greetd_sock()
GLib.idle_add(self._on_login_error, None, self._strings.socket_error.format(error=e)) GLib.idle_add(self._on_login_error, None, self._strings.socket_error)
def _on_login_error(self, response: dict | None, message: str) -> None: def _on_login_error(self, response: dict | None, message: str) -> None:
"""Handle login error on the GTK main thread.""" """Handle login error on the GTK main thread."""
@ -483,12 +514,13 @@ class GreeterWindow(Gtk.ApplicationWindow):
def _cancel_pending_session(self) -> None: def _cancel_pending_session(self) -> None:
"""Cancel any in-progress greetd session.""" """Cancel any in-progress greetd session."""
if self._greetd_sock: with self._greetd_sock_lock:
try: if self._greetd_sock:
cancel_session(self._greetd_sock) try:
except (ConnectionError, OSError): cancel_session(self._greetd_sock)
pass except (ConnectionError, OSError):
self._close_greetd_sock() pass
self._close_greetd_sock()
def _get_selected_session(self) -> Session | None: def _get_selected_session(self) -> Session | None:
"""Get the currently selected session from the dropdown.""" """Get the currently selected session from the dropdown."""
@ -535,9 +567,12 @@ class GreeterWindow(Gtk.ApplicationWindow):
"""Load the last logged-in username from cache.""" """Load the last logged-in username from cache."""
if LAST_USER_PATH.exists(): if LAST_USER_PATH.exists():
try: try:
return LAST_USER_PATH.read_text().strip() username = LAST_USER_PATH.read_text().strip()
except OSError: except OSError:
return None return None
if len(username) > MAX_USERNAME_LENGTH or not VALID_USERNAME.match(username):
return None
return username
return None return None
@staticmethod @staticmethod

View File

@ -31,9 +31,11 @@ class Strings:
reboot_failed: str reboot_failed: str
shutdown_failed: str shutdown_failed: str
# Templates (use .format()) # Error messages (continued)
connection_error: str connection_error: str
socket_error: str socket_error: str
# Templates (use .format())
faillock_attempts_remaining: str faillock_attempts_remaining: str
faillock_locked: str faillock_locked: str
@ -54,8 +56,8 @@ _STRINGS_DE = Strings(
session_start_failed="Session konnte nicht gestartet werden", session_start_failed="Session konnte nicht gestartet werden",
reboot_failed="Neustart fehlgeschlagen", reboot_failed="Neustart fehlgeschlagen",
shutdown_failed="Herunterfahren fehlgeschlagen", shutdown_failed="Herunterfahren fehlgeschlagen",
connection_error="Verbindungsfehler: {error}", connection_error="Verbindungsfehler",
socket_error="Socket-Fehler: {error}", socket_error="Socket-Fehler",
faillock_attempts_remaining="Noch {n} Versuch(e) vor Kontosperrung!", faillock_attempts_remaining="Noch {n} Versuch(e) vor Kontosperrung!",
faillock_locked="Konto ist möglicherweise gesperrt", faillock_locked="Konto ist möglicherweise gesperrt",
) )
@ -76,8 +78,8 @@ _STRINGS_EN = Strings(
session_start_failed="Failed to start session", session_start_failed="Failed to start session",
reboot_failed="Reboot failed", reboot_failed="Reboot failed",
shutdown_failed="Shutdown failed", shutdown_failed="Shutdown failed",
connection_error="Connection error: {error}", connection_error="Connection error",
socket_error="Socket error: {error}", socket_error="Socket error",
faillock_attempts_remaining="{n} attempt(s) remaining before lockout!", faillock_attempts_remaining="{n} attempt(s) remaining before lockout!",
faillock_locked="Account may be locked", faillock_locked="Account may be locked",
) )
@ -102,7 +104,9 @@ def detect_locale(locale_conf_path: Path = DEFAULT_LOCALE_CONF) -> str:
return "en" return "en"
# Extract language prefix: "de_DE.UTF-8" → "de" # Extract language prefix: "de_DE.UTF-8" → "de"
lang = lang.split("_")[0].split(".")[0] lang = lang.split("_")[0].split(".")[0].lower()
if not lang.isalpha():
return "en"
return lang return lang

View File

@ -4,11 +4,14 @@
import subprocess import subprocess
POWER_TIMEOUT = 30
def reboot() -> None: def reboot() -> None:
"""Reboot the system via loginctl.""" """Reboot the system via loginctl."""
subprocess.run(["loginctl", "reboot"], check=True) subprocess.run(["loginctl", "reboot"], check=True, timeout=POWER_TIMEOUT)
def shutdown() -> None: def shutdown() -> None:
"""Shut down the system via loginctl.""" """Shut down the system via loginctl."""
subprocess.run(["loginctl", "poweroff"], check=True) subprocess.run(["loginctl", "poweroff"], check=True, timeout=POWER_TIMEOUT)

View File

@ -2,6 +2,7 @@
# ABOUTME: Parses .desktop files from standard session directories. # ABOUTME: Parses .desktop files from standard session directories.
import configparser import configparser
from collections.abc import Sequence
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@ -37,8 +38,8 @@ def _parse_desktop_file(path: Path, session_type: str) -> Session | None:
def get_sessions( def get_sessions(
wayland_dirs: tuple[Path, ...] = DEFAULT_WAYLAND_DIRS, wayland_dirs: Sequence[Path] = DEFAULT_WAYLAND_DIRS,
xsession_dirs: tuple[Path, ...] = DEFAULT_XSESSION_DIRS, xsession_dirs: Sequence[Path] = DEFAULT_XSESSION_DIRS,
) -> list[Session]: ) -> list[Session]:
"""Discover available sessions from .desktop files.""" """Discover available sessions from .desktop files."""
sessions: list[Session] = [] sessions: list[Session] = []

View File

@ -35,6 +35,14 @@ class TestLoadConfig:
assert config.background is None assert config.background is None
def test_returns_defaults_for_corrupt_toml(self, tmp_path: Path) -> None:
toml_file = tmp_path / "moongreet.toml"
toml_file.write_text("this is not valid [[[ toml !!!")
config = load_config(toml_file)
assert config.background is None
def test_resolves_relative_path_against_config_dir(self, tmp_path: Path) -> None: def test_resolves_relative_path_against_config_dir(self, tmp_path: Path) -> None:
toml_file = tmp_path / "moongreet.toml" toml_file = tmp_path / "moongreet.toml"
toml_file.write_text( toml_file.write_text(

View File

@ -63,6 +63,13 @@ class TestDetectLocale:
assert result == "en" assert result == "en"
def test_rejects_non_alpha_lang(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("LANG", "../../etc")
result = detect_locale()
assert result == "en"
class TestLoadStrings: class TestLoadStrings:
"""Tests for loading the correct string table.""" """Tests for loading the correct string table."""
@ -111,8 +118,9 @@ class TestLoadStrings:
result = strings.faillock_attempts_remaining.format(n=1) result = strings.faillock_attempts_remaining.format(n=1)
assert "1" in result assert "1" in result
def test_connection_error_template(self) -> None: def test_connection_error_is_generic(self) -> None:
strings = load_strings("en") strings = load_strings("en")
result = strings.connection_error.format(error="timeout") # Error messages should not contain format placeholders (no info leakage)
assert "timeout" in result assert "{" not in strings.connection_error
assert "{" not in strings.socket_error

View File

@ -40,16 +40,27 @@ class MockGreetd:
self._thread = threading.Thread(target=self._serve, daemon=True) self._thread = threading.Thread(target=self._serve, daemon=True)
self._thread.start() self._thread.start()
@staticmethod
def _recvall(conn: socket.socket, n: int) -> bytes:
"""Receive exactly n bytes from a socket, handling fragmented reads."""
buf = bytearray()
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
break
buf.extend(chunk)
return bytes(buf)
def _serve(self) -> None: def _serve(self) -> None:
conn, _ = self._server.accept() conn, _ = self._server.accept()
try: try:
for response in self._responses: for response in self._responses:
# Receive a message # Receive a message
header = conn.recv(4) header = self._recvall(conn, 4)
if len(header) < 4: if len(header) < 4:
break break
length = struct.unpack("!I", header)[0] length = struct.unpack("!I", header)[0]
payload = conn.recv(length) payload = self._recvall(conn, length)
msg = json.loads(payload.decode("utf-8")) msg = json.loads(payload.decode("utf-8"))
self._received.append(msg) self._received.append(msg)
@ -182,6 +193,10 @@ class TestLoginFlow:
class TestFaillockWarning: class TestFaillockWarning:
"""Tests for the faillock warning message logic.""" """Tests for the faillock warning message logic."""
def test_no_warning_on_zero_attempts(self) -> None:
strings = load_strings("de")
assert faillock_warning(0, strings) is None
def test_no_warning_on_first_attempt(self) -> None: def test_no_warning_on_first_attempt(self) -> None:
strings = load_strings("de") strings = load_strings("de")
assert faillock_warning(1, strings) is None assert faillock_warning(1, strings) is None
@ -231,3 +246,21 @@ class TestLastUser:
from moongreet.greeter import GreeterWindow from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user() result = GreeterWindow._load_last_user()
assert result is None assert result is None
def test_load_last_user_rejects_oversized_username(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "last-user"
cache_path.write_text("a" * 300)
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user()
assert result is None
def test_load_last_user_rejects_invalid_characters(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "last-user"
cache_path.write_text("../../etc/passwd")
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user()
assert result is None

View File

@ -6,7 +6,7 @@ from unittest.mock import patch, call
import pytest import pytest
from moongreet.power import reboot, shutdown from moongreet.power import reboot, shutdown, POWER_TIMEOUT
class TestReboot: class TestReboot:
@ -17,7 +17,7 @@ class TestReboot:
reboot() reboot()
mock_run.assert_called_once_with( mock_run.assert_called_once_with(
["loginctl", "reboot"], check=True ["loginctl", "reboot"], check=True, timeout=POWER_TIMEOUT
) )
@patch("moongreet.power.subprocess.run") @patch("moongreet.power.subprocess.run")
@ -36,7 +36,7 @@ class TestShutdown:
shutdown() shutdown()
mock_run.assert_called_once_with( mock_run.assert_called_once_with(
["loginctl", "poweroff"], check=True ["loginctl", "poweroff"], check=True, timeout=POWER_TIMEOUT
) )
@patch("moongreet.power.subprocess.run") @patch("moongreet.power.subprocess.run")