Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6907db0c2a | |||
| 64f08d7e8b | |||
| cab1997dff | |||
| 3dfa596f9a | |||
| 357d2459cf | |||
| ba4f30f254 | |||
| e37b273913 | |||
| ecd89f5b10 | |||
| d089fa201c | |||
| 6400270a50 | |||
| 10b613b50b |
@@ -8,3 +8,9 @@ build/
|
|||||||
.pytest_cache/
|
.pytest_cache/
|
||||||
.pyright/
|
.pyright/
|
||||||
*.egg
|
*.egg
|
||||||
|
|
||||||
|
# makepkg build artifacts
|
||||||
|
pkg/src/
|
||||||
|
pkg/pkg/
|
||||||
|
pkg/*.pkg.tar*
|
||||||
|
pkg/greetd-moongreet/
|
||||||
|
|||||||
+3
-1
@@ -5,5 +5,7 @@
|
|||||||
vt = 1
|
vt = 1
|
||||||
|
|
||||||
[default_session]
|
[default_session]
|
||||||
command = "moongreet"
|
# Moongreet braucht einen Wayland-Compositor — niri stellt diesen bereit.
|
||||||
|
# Siehe niri-greeter.kdl fuer die Compositor-Konfiguration.
|
||||||
|
command = "niri -c /etc/greetd/niri-greeter.kdl"
|
||||||
user = "greeter"
|
user = "greeter"
|
||||||
|
|||||||
@@ -4,3 +4,5 @@
|
|||||||
[appearance]
|
[appearance]
|
||||||
# Absolute path to wallpaper image
|
# Absolute path to wallpaper image
|
||||||
background = "/usr/share/backgrounds/wallpaper.jpg"
|
background = "/usr/share/backgrounds/wallpaper.jpg"
|
||||||
|
# GTK theme for the greeter UI
|
||||||
|
gtk-theme = "catppuccin-mocha-lavender-standard+default"
|
||||||
|
|||||||
@@ -0,0 +1,59 @@
|
|||||||
|
// ABOUTME: Niri-Konfiguration fuer den Moongreet Login-Greeter.
|
||||||
|
// ABOUTME: Wird von greetd gestartet — minimale Config ohne Keybinds fuer Sicherheit.
|
||||||
|
|
||||||
|
input {
|
||||||
|
keyboard {
|
||||||
|
xkb {
|
||||||
|
layout "de"
|
||||||
|
}
|
||||||
|
numlock
|
||||||
|
}
|
||||||
|
|
||||||
|
touchpad {
|
||||||
|
tap
|
||||||
|
natural-scroll
|
||||||
|
}
|
||||||
|
|
||||||
|
mouse {
|
||||||
|
accel-profile "flat"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cursor {
|
||||||
|
xcursor-theme "Sweet-cursors"
|
||||||
|
}
|
||||||
|
|
||||||
|
layout {
|
||||||
|
gaps 0
|
||||||
|
|
||||||
|
focus-ring {
|
||||||
|
off
|
||||||
|
}
|
||||||
|
|
||||||
|
border {
|
||||||
|
off
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Moongreet starten und niri beenden, sobald moongreet sich schliesst.
|
||||||
|
// Retry-Loop stellt sicher, dass niri auch bei fruehen Crashes von moongreet beendet wird.
|
||||||
|
spawn-sh-at-startup "moongreet; while ! niri msg action quit --skip-confirmation 2>/dev/null; do sleep 0.5; done"
|
||||||
|
|
||||||
|
// Greeter-Fenster maximiert darstellen
|
||||||
|
window-rule {
|
||||||
|
open-maximized true
|
||||||
|
}
|
||||||
|
|
||||||
|
hotkey-overlay {
|
||||||
|
skip-at-startup
|
||||||
|
}
|
||||||
|
|
||||||
|
prefer-no-csd
|
||||||
|
|
||||||
|
animations {
|
||||||
|
off
|
||||||
|
}
|
||||||
|
|
||||||
|
binds {
|
||||||
|
// Keine Keybinds — verhindert Zugriff auf Terminals oder andere Aktionen
|
||||||
|
}
|
||||||
+16
-7
@@ -1,10 +1,10 @@
|
|||||||
# ABOUTME: AUR PKGBUILD for Moongreet — greetd greeter for Wayland.
|
# ABOUTME: PKGBUILD for Moongreet — greetd greeter for Wayland.
|
||||||
# ABOUTME: Builds from git source, installs config and cache directory.
|
# ABOUTME: Builds from git source with automatic version detection.
|
||||||
|
|
||||||
# Maintainer: Dominik Kressler
|
# Maintainer: Dominik Kressler
|
||||||
|
|
||||||
pkgname=moongreet
|
pkgname=moongreet-git
|
||||||
pkgver=0.1.0
|
pkgver=0.2.0.r0.g64f08d7
|
||||||
pkgrel=1
|
pkgrel=1
|
||||||
pkgdesc="A greetd greeter for Wayland, built with Python + GTK4 + gtk4-layer-shell"
|
pkgdesc="A greetd greeter for Wayland, built with Python + GTK4 + gtk4-layer-shell"
|
||||||
arch=('any')
|
arch=('any')
|
||||||
@@ -23,12 +23,20 @@ makedepends=(
|
|||||||
'python-installer'
|
'python-installer'
|
||||||
'python-hatchling'
|
'python-hatchling'
|
||||||
)
|
)
|
||||||
|
provides=('moongreet')
|
||||||
|
conflicts=('moongreet')
|
||||||
install=moongreet.install
|
install=moongreet.install
|
||||||
source=("git+${url}.git#tag=v${pkgver}")
|
source=("git+${url}.git")
|
||||||
sha256sums=('SKIP')
|
sha256sums=('SKIP')
|
||||||
|
|
||||||
|
pkgver() {
|
||||||
|
cd "$srcdir/greetd-moongreet"
|
||||||
|
git describe --long --tags | sed 's/^v//;s/-/.r/;s/-/./'
|
||||||
|
}
|
||||||
|
|
||||||
build() {
|
build() {
|
||||||
cd "$srcdir/greetd-moongreet"
|
cd "$srcdir/greetd-moongreet"
|
||||||
|
rm -rf dist/
|
||||||
python -m build --wheel --no-isolation
|
python -m build --wheel --no-isolation
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,9 +44,10 @@ package() {
|
|||||||
cd "$srcdir/greetd-moongreet"
|
cd "$srcdir/greetd-moongreet"
|
||||||
python -m installer --destdir="$pkgdir" dist/*.whl
|
python -m installer --destdir="$pkgdir" dist/*.whl
|
||||||
|
|
||||||
# Example config
|
# Greeter config
|
||||||
install -Dm644 config/moongreet.toml "$pkgdir/etc/moongreet/moongreet.toml"
|
install -Dm644 config/moongreet.toml "$pkgdir/etc/moongreet/moongreet.toml"
|
||||||
|
|
||||||
# Cache directory
|
# Cache directories
|
||||||
install -dm755 "$pkgdir/var/cache/moongreet"
|
install -dm755 "$pkgdir/var/cache/moongreet"
|
||||||
|
install -dm755 "$pkgdir/var/cache/moongreet/last-session"
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,16 +1,15 @@
|
|||||||
# ABOUTME: pacman install hooks for Moongreet.
|
# ABOUTME: pacman install hooks for Moongreet.
|
||||||
# ABOUTME: Sets ownership on cache directory for the greeter user.
|
# ABOUTME: Sets ownership on cache directory and prints setup instructions.
|
||||||
|
|
||||||
post_install() {
|
post_install() {
|
||||||
if getent passwd greeter > /dev/null 2>&1; then
|
if getent passwd greeter > /dev/null 2>&1; then
|
||||||
chown greeter:greeter /var/cache/moongreet
|
chown greeter:greeter /var/cache/moongreet
|
||||||
|
chown greeter:greeter /var/cache/moongreet/last-session
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo "==> Copy /etc/moongreet/moongreet.toml and adjust the wallpaper path."
|
echo "==> Moongreet installed."
|
||||||
echo "==> Configure greetd to use moongreet:"
|
echo "==> Add moongreet to your greeter compositor command in /etc/greetd/config.toml."
|
||||||
echo " [default_session]"
|
echo "==> Adjust wallpaper: /etc/moongreet/moongreet.toml"
|
||||||
echo " command = \"moongreet\""
|
|
||||||
echo " user = \"greeter\""
|
|
||||||
}
|
}
|
||||||
|
|
||||||
post_upgrade() {
|
post_upgrade() {
|
||||||
|
|||||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "moongreet"
|
name = "moongreet"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
description = "A greetd greeter for Wayland with GTK4"
|
description = "A greetd greeter for Wayland with GTK4"
|
||||||
requires-python = ">=3.11"
|
requires-python = ">=3.11"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|||||||
+10
-1
@@ -17,6 +17,7 @@ class Config:
|
|||||||
"""Greeter configuration loaded from moongreet.toml."""
|
"""Greeter configuration loaded from moongreet.toml."""
|
||||||
|
|
||||||
background: Path | None = None
|
background: Path | None = None
|
||||||
|
gtk_theme: str | None = None
|
||||||
|
|
||||||
|
|
||||||
def load_config(config_path: Path | None = None) -> Config:
|
def load_config(config_path: Path | None = None) -> Config:
|
||||||
@@ -51,6 +52,10 @@ def load_config(config_path: Path | None = None) -> Config:
|
|||||||
bg_path = config_path.parent / bg_path
|
bg_path = config_path.parent / bg_path
|
||||||
config.background = bg_path
|
config.background = bg_path
|
||||||
|
|
||||||
|
gtk_theme = appearance.get("gtk-theme")
|
||||||
|
if gtk_theme:
|
||||||
|
config.gtk_theme = gtk_theme
|
||||||
|
|
||||||
return config
|
return config
|
||||||
|
|
||||||
|
|
||||||
@@ -71,5 +76,9 @@ def resolve_wallpaper_path(
|
|||||||
return config.background, None
|
return config.background, None
|
||||||
|
|
||||||
ctx = as_file(_DEFAULT_WALLPAPER_PATH)
|
ctx = as_file(_DEFAULT_WALLPAPER_PATH)
|
||||||
path = ctx.__enter__()
|
try:
|
||||||
|
path = ctx.__enter__()
|
||||||
|
except Exception:
|
||||||
|
ctx.__exit__(None, None, None)
|
||||||
|
raise
|
||||||
return path, ctx
|
return path, ctx
|
||||||
|
|||||||
+125
-48
@@ -18,22 +18,24 @@ gi.require_version("Gtk", "4.0")
|
|||||||
gi.require_version("Gdk", "4.0")
|
gi.require_version("Gdk", "4.0")
|
||||||
from gi.repository import Gtk, Gdk, GLib, GdkPixbuf
|
from gi.repository import Gtk, Gdk, GLib, GdkPixbuf
|
||||||
|
|
||||||
from moongreet.config import load_config, resolve_wallpaper_path
|
from moongreet.config import Config, load_config, resolve_wallpaper_path
|
||||||
from moongreet.i18n import load_strings, Strings
|
from moongreet.i18n import load_strings, Strings
|
||||||
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
|
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
|
||||||
from moongreet.users import User, get_users, get_avatar_path, get_user_gtk_theme
|
from moongreet.users import User, get_users, get_avatar_path
|
||||||
from moongreet.sessions import Session, get_sessions
|
from moongreet.sessions import Session, get_sessions
|
||||||
from moongreet.power import reboot, shutdown
|
from moongreet.power import reboot, shutdown
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
LAST_USER_PATH = Path("/var/cache/moongreet/last-user")
|
LAST_USER_PATH = Path("/var/cache/moongreet/last-user")
|
||||||
|
LAST_SESSION_DIR = Path("/var/cache/moongreet/last-session")
|
||||||
FAILLOCK_MAX_ATTEMPTS = 3
|
FAILLOCK_MAX_ATTEMPTS = 3
|
||||||
VALID_USERNAME = re.compile(r"^[a-zA-Z0-9_.-]+$")
|
VALID_USERNAME = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_.-]*$")
|
||||||
MAX_USERNAME_LENGTH = 256
|
MAX_USERNAME_LENGTH = 256
|
||||||
PACKAGE_DATA = files("moongreet") / "data"
|
PACKAGE_DATA = files("moongreet") / "data"
|
||||||
DEFAULT_AVATAR_PATH = PACKAGE_DATA / "default-avatar.svg"
|
DEFAULT_AVATAR_PATH = PACKAGE_DATA / "default-avatar.svg"
|
||||||
AVATAR_SIZE = 128
|
AVATAR_SIZE = 128
|
||||||
|
MAX_AVATAR_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
|
||||||
|
|
||||||
|
|
||||||
def faillock_warning(attempt_count: int, strings: Strings | None = None) -> str | None:
|
def faillock_warning(attempt_count: int, strings: Strings | None = None) -> str | None:
|
||||||
@@ -76,23 +78,25 @@ class WallpaperWindow(Gtk.ApplicationWindow):
|
|||||||
class GreeterWindow(Gtk.ApplicationWindow):
|
class GreeterWindow(Gtk.ApplicationWindow):
|
||||||
"""The main greeter window with login UI."""
|
"""The main greeter window with login UI."""
|
||||||
|
|
||||||
def __init__(self, bg_path: Path | None = None, **kwargs) -> None:
|
def __init__(self, bg_path: Path | None = None, config: Config | None = None, **kwargs) -> None:
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.add_css_class("greeter")
|
self.add_css_class("greeter")
|
||||||
self.set_default_size(1920, 1080)
|
self.set_default_size(1920, 1080)
|
||||||
|
|
||||||
self._config = load_config()
|
self._config = config if config is not None else load_config()
|
||||||
self._strings = load_strings()
|
self._strings = load_strings()
|
||||||
self._users = get_users()
|
self._users = get_users()
|
||||||
self._sessions = get_sessions()
|
self._sessions = get_sessions()
|
||||||
self._selected_user: User | None = None
|
self._selected_user: User | None = None
|
||||||
self._greetd_sock: socket.socket | None = None
|
self._greetd_sock: socket.socket | None = None
|
||||||
self._greetd_sock_lock = threading.Lock()
|
self._greetd_sock_lock = threading.Lock()
|
||||||
|
self._login_cancelled = threading.Event()
|
||||||
self._default_avatar_pixbuf: GdkPixbuf.Pixbuf | None = None
|
self._default_avatar_pixbuf: GdkPixbuf.Pixbuf | None = None
|
||||||
self._avatar_cache: dict[str, GdkPixbuf.Pixbuf] = {}
|
self._avatar_cache: dict[str, GdkPixbuf.Pixbuf] = {}
|
||||||
self._failed_attempts: dict[str, int] = {}
|
self._failed_attempts: dict[str, int] = {}
|
||||||
self._bg_path = bg_path
|
self._bg_path = bg_path
|
||||||
|
|
||||||
|
self._apply_global_theme()
|
||||||
self._build_ui()
|
self._build_ui()
|
||||||
self._setup_keyboard_navigation()
|
self._setup_keyboard_navigation()
|
||||||
# Defer initial user selection until the window is realized,
|
# Defer initial user selection until the window is realized,
|
||||||
@@ -245,10 +249,13 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
|
|
||||||
return bar
|
return bar
|
||||||
|
|
||||||
def _select_initial_user(self) -> None:
|
def _select_initial_user(self) -> bool:
|
||||||
"""Select the last user or the first available user."""
|
"""Select the last user or the first available user.
|
||||||
|
|
||||||
|
Returns False to deregister from GLib.idle_add after a single invocation.
|
||||||
|
"""
|
||||||
if not self._users:
|
if not self._users:
|
||||||
return
|
return False
|
||||||
|
|
||||||
# Try to load last user
|
# Try to load last user
|
||||||
last_username = self._load_last_user()
|
last_username = self._load_last_user()
|
||||||
@@ -264,6 +271,7 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
target_user = self._users[0]
|
target_user = self._users[0]
|
||||||
|
|
||||||
self._switch_to_user(target_user)
|
self._switch_to_user(target_user)
|
||||||
|
return False
|
||||||
|
|
||||||
def _switch_to_user(self, user: User) -> None:
|
def _switch_to_user(self, user: User) -> None:
|
||||||
"""Update the UI to show the selected user."""
|
"""Update the UI to show the selected user."""
|
||||||
@@ -286,26 +294,23 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
# which works in ZIP wheels too, no exists() check needed
|
# which works in ZIP wheels too, no exists() check needed
|
||||||
self._set_default_avatar()
|
self._set_default_avatar()
|
||||||
|
|
||||||
# Apply user's GTK theme if available
|
# Pre-select last used session for this user
|
||||||
self._apply_user_theme(user)
|
self._select_last_session(user)
|
||||||
|
|
||||||
# Focus password entry
|
# Focus password entry
|
||||||
self._password_entry.grab_focus()
|
self._password_entry.grab_focus()
|
||||||
|
|
||||||
def _apply_user_theme(self, user: User) -> None:
|
def _apply_global_theme(self) -> None:
|
||||||
"""Load the user's preferred GTK theme from their settings.ini."""
|
"""Apply the GTK theme from moongreet.toml configuration."""
|
||||||
gtk_config_dir = user.home / ".config" / "gtk-4.0"
|
theme_name = self._config.gtk_theme
|
||||||
theme_name = get_user_gtk_theme(config_dir=gtk_config_dir)
|
if not theme_name:
|
||||||
|
return
|
||||||
|
|
||||||
settings = Gtk.Settings.get_default()
|
settings = Gtk.Settings.get_default()
|
||||||
if settings is None:
|
if settings is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
current = settings.get_property("gtk-theme-name")
|
settings.set_property("gtk-theme-name", theme_name)
|
||||||
if theme_name and current != theme_name:
|
|
||||||
settings.set_property("gtk-theme-name", theme_name)
|
|
||||||
elif not theme_name and current:
|
|
||||||
settings.reset_property("gtk-theme-name")
|
|
||||||
|
|
||||||
def _get_foreground_color(self) -> str:
|
def _get_foreground_color(self) -> str:
|
||||||
"""Get the current GTK theme foreground color as a hex string."""
|
"""Get the current GTK theme foreground color as a hex string."""
|
||||||
@@ -339,6 +344,9 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
def _set_avatar_from_file(self, path: Path, username: str | None = None) -> None:
|
def _set_avatar_from_file(self, path: Path, username: str | None = None) -> None:
|
||||||
"""Load an image file and set it as the avatar, scaled to AVATAR_SIZE."""
|
"""Load an image file and set it as the avatar, scaled to AVATAR_SIZE."""
|
||||||
try:
|
try:
|
||||||
|
if path.stat().st_size > MAX_AVATAR_FILE_SIZE:
|
||||||
|
self._avatar_image.set_from_icon_name("avatar-default-symbolic")
|
||||||
|
return
|
||||||
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
|
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
|
||||||
str(path), AVATAR_SIZE, AVATAR_SIZE, True
|
str(path), AVATAR_SIZE, AVATAR_SIZE, True
|
||||||
)
|
)
|
||||||
@@ -428,6 +436,7 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
return
|
return
|
||||||
|
|
||||||
# Disable UI while authenticating — the IPC runs in a background thread
|
# Disable UI while authenticating — the IPC runs in a background thread
|
||||||
|
self._login_cancelled.clear()
|
||||||
self._set_login_sensitive(False)
|
self._set_login_sensitive(False)
|
||||||
thread = threading.Thread(
|
thread = threading.Thread(
|
||||||
target=self._login_worker,
|
target=self._login_worker,
|
||||||
@@ -439,26 +448,41 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
def _login_worker(self, user: User, password: str, session: Session, sock_path: str) -> None:
|
def _login_worker(self, user: User, password: str, session: Session, sock_path: str) -> None:
|
||||||
"""Run greetd IPC in a background thread to avoid blocking the GTK main loop."""
|
"""Run greetd IPC in a background thread to avoid blocking the GTK main loop."""
|
||||||
try:
|
try:
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
sock.settimeout(10.0)
|
sock.settimeout(10.0)
|
||||||
sock.connect(sock_path)
|
sock.connect(sock_path)
|
||||||
with self._greetd_sock_lock:
|
with self._greetd_sock_lock:
|
||||||
self._greetd_sock = sock
|
self._greetd_sock = sock
|
||||||
|
|
||||||
# Step 1: Create session
|
# Step 1: Create session — if a stale session exists, cancel it and retry
|
||||||
response = create_session(sock, user.username)
|
response = create_session(sock, user.username)
|
||||||
|
|
||||||
if response.get("type") == "error":
|
if self._login_cancelled.is_set():
|
||||||
GLib.idle_add(self._on_login_error, response, self._strings.auth_failed)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if response.get("type") == "error":
|
||||||
|
cancel_session(sock)
|
||||||
|
response = create_session(sock, user.username)
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
|
if response.get("type") == "error":
|
||||||
|
GLib.idle_add(self._on_login_error, response, self._strings.auth_failed)
|
||||||
|
return
|
||||||
|
|
||||||
# Step 2: Send password if auth message received
|
# Step 2: Send password if auth message received
|
||||||
if response.get("type") == "auth_message":
|
if response.get("type") == "auth_message":
|
||||||
response = post_auth_response(sock, password)
|
response = post_auth_response(sock, password)
|
||||||
|
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
if response.get("type") == "error":
|
if response.get("type") == "error":
|
||||||
self._failed_attempts[user.username] = self._failed_attempts.get(user.username, 0) + 1
|
self._failed_attempts[user.username] = self._failed_attempts.get(user.username, 0) + 1
|
||||||
warning = faillock_warning(self._failed_attempts[user.username], self._strings)
|
warning = faillock_warning(self._failed_attempts[user.username], self._strings)
|
||||||
|
cancel_session(sock)
|
||||||
GLib.idle_add(self._on_login_auth_error, response, warning)
|
GLib.idle_add(self._on_login_auth_error, response, warning)
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -477,32 +501,36 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
return
|
return
|
||||||
response = start_session(sock, cmd)
|
response = start_session(sock, cmd)
|
||||||
|
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
if response.get("type") == "success":
|
if response.get("type") == "success":
|
||||||
self._save_last_user(user.username)
|
self._save_last_user(user.username)
|
||||||
self._close_greetd_sock()
|
self._save_last_session(user.username, session.name)
|
||||||
GLib.idle_add(self.get_application().quit)
|
GLib.idle_add(self.get_application().quit)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
GLib.idle_add(self._on_login_error, response, self._strings.session_start_failed)
|
GLib.idle_add(self._on_login_error, response, self._strings.session_start_failed)
|
||||||
|
return
|
||||||
|
|
||||||
|
except (ConnectionError, OSError, ValueError) as e:
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
# Socket was closed by _cancel_pending_session — exit silently
|
||||||
|
return
|
||||||
|
logger.error("greetd IPC error: %s", e)
|
||||||
|
if isinstance(e, ConnectionError):
|
||||||
|
GLib.idle_add(self._on_login_error, None, self._strings.connection_error)
|
||||||
|
else:
|
||||||
|
GLib.idle_add(self._on_login_error, None, self._strings.socket_error)
|
||||||
|
finally:
|
||||||
self._close_greetd_sock()
|
self._close_greetd_sock()
|
||||||
|
|
||||||
except ConnectionError as e:
|
|
||||||
logger.error("greetd connection error: %s", e)
|
|
||||||
self._close_greetd_sock()
|
|
||||||
GLib.idle_add(self._on_login_error, None, self._strings.connection_error)
|
|
||||||
except (OSError, ValueError) as e:
|
|
||||||
logger.error("greetd socket error: %s", e)
|
|
||||||
self._close_greetd_sock()
|
|
||||||
GLib.idle_add(self._on_login_error, None, self._strings.socket_error)
|
|
||||||
|
|
||||||
def _on_login_error(self, response: dict | None, message: str) -> None:
|
def _on_login_error(self, response: dict | None, message: str) -> None:
|
||||||
"""Handle login error on the GTK main thread."""
|
"""Handle login error on the GTK main thread."""
|
||||||
if response:
|
if response:
|
||||||
self._show_greetd_error(response, message)
|
self._show_greetd_error(response, message)
|
||||||
else:
|
else:
|
||||||
self._show_error(message)
|
self._show_error(message)
|
||||||
self._close_greetd_sock()
|
|
||||||
self._set_login_sensitive(True)
|
self._set_login_sensitive(True)
|
||||||
|
|
||||||
def _on_login_auth_error(self, response: dict, warning: str | None) -> None:
|
def _on_login_auth_error(self, response: dict, warning: str | None) -> None:
|
||||||
@@ -511,17 +539,16 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
if warning:
|
if warning:
|
||||||
current = self._error_label.get_text()
|
current = self._error_label.get_text()
|
||||||
self._error_label.set_text(f"{current}\n{warning}")
|
self._error_label.set_text(f"{current}\n{warning}")
|
||||||
self._close_greetd_sock()
|
|
||||||
self._set_login_sensitive(True)
|
self._set_login_sensitive(True)
|
||||||
|
|
||||||
def _cancel_pending_session(self) -> None:
|
def _cancel_pending_session(self) -> None:
|
||||||
"""Cancel any in-progress greetd session."""
|
"""Cancel any in-progress greetd session.
|
||||||
with self._greetd_sock_lock:
|
|
||||||
if self._greetd_sock:
|
Sets the cancellation event and closes the socket to interrupt
|
||||||
try:
|
any blocking I/O in the login worker. The worker checks the
|
||||||
cancel_session(self._greetd_sock)
|
event and exits silently instead of showing an error.
|
||||||
except (ConnectionError, OSError):
|
"""
|
||||||
pass
|
self._login_cancelled.set()
|
||||||
self._close_greetd_sock()
|
self._close_greetd_sock()
|
||||||
|
|
||||||
def _get_selected_session(self) -> Session | None:
|
def _get_selected_session(self) -> Session | None:
|
||||||
@@ -533,6 +560,18 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
return self._sessions[idx]
|
return self._sessions[idx]
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _select_last_session(self, user: User) -> None:
|
||||||
|
"""Pre-select the last used session for a user in the dropdown."""
|
||||||
|
if not self._sessions:
|
||||||
|
return
|
||||||
|
last_session_name = self._load_last_session(user.username)
|
||||||
|
if not last_session_name:
|
||||||
|
return
|
||||||
|
for i, session in enumerate(self._sessions):
|
||||||
|
if session.name == last_session_name:
|
||||||
|
self._session_dropdown.set_selected(i)
|
||||||
|
return
|
||||||
|
|
||||||
MAX_GREETD_ERROR_LENGTH = 200
|
MAX_GREETD_ERROR_LENGTH = 200
|
||||||
|
|
||||||
def _show_greetd_error(self, response: dict, fallback: str) -> None:
|
def _show_greetd_error(self, response: dict, fallback: str) -> None:
|
||||||
@@ -552,17 +591,26 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
|
|
||||||
def _on_reboot_clicked(self, button: Gtk.Button) -> None:
|
def _on_reboot_clicked(self, button: Gtk.Button) -> None:
|
||||||
"""Handle reboot button click."""
|
"""Handle reboot button click."""
|
||||||
try:
|
button.set_sensitive(False)
|
||||||
reboot()
|
threading.Thread(
|
||||||
except subprocess.CalledProcessError:
|
target=self._power_worker, args=(reboot, self._strings.reboot_failed),
|
||||||
self._show_error(self._strings.reboot_failed)
|
daemon=True,
|
||||||
|
).start()
|
||||||
|
|
||||||
def _on_shutdown_clicked(self, button: Gtk.Button) -> None:
|
def _on_shutdown_clicked(self, button: Gtk.Button) -> None:
|
||||||
"""Handle shutdown button click."""
|
"""Handle shutdown button click."""
|
||||||
|
button.set_sensitive(False)
|
||||||
|
threading.Thread(
|
||||||
|
target=self._power_worker, args=(shutdown, self._strings.shutdown_failed),
|
||||||
|
daemon=True,
|
||||||
|
).start()
|
||||||
|
|
||||||
|
def _power_worker(self, action, error_msg: str) -> None:
|
||||||
|
"""Run a power action in a background thread to avoid blocking the GTK main loop."""
|
||||||
try:
|
try:
|
||||||
shutdown()
|
action()
|
||||||
except subprocess.CalledProcessError:
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
|
||||||
self._show_error(self._strings.shutdown_failed)
|
GLib.idle_add(self._show_error, error_msg)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _load_last_user() -> str | None:
|
def _load_last_user() -> str | None:
|
||||||
@@ -585,3 +633,32 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
LAST_USER_PATH.write_text(username)
|
LAST_USER_PATH.write_text(username)
|
||||||
except OSError:
|
except OSError:
|
||||||
pass # Non-critical — cache dir may not be writable
|
pass # Non-critical — cache dir may not be writable
|
||||||
|
|
||||||
|
MAX_SESSION_NAME_LENGTH = 256
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _save_last_session(username: str, session_name: str) -> None:
|
||||||
|
"""Save the last used session name for a user to cache."""
|
||||||
|
if not VALID_USERNAME.match(username) or len(username) > MAX_USERNAME_LENGTH:
|
||||||
|
return
|
||||||
|
if not session_name or len(session_name) > GreeterWindow.MAX_SESSION_NAME_LENGTH:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
LAST_SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
(LAST_SESSION_DIR / username).write_text(session_name)
|
||||||
|
except OSError:
|
||||||
|
pass # Non-critical — cache dir may not be writable
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _load_last_session(username: str) -> str | None:
|
||||||
|
"""Load the last used session name for a user from cache."""
|
||||||
|
session_file = LAST_SESSION_DIR / username
|
||||||
|
if not session_file.exists():
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
name = session_file.read_text().strip()
|
||||||
|
except OSError:
|
||||||
|
return None
|
||||||
|
if not name or len(name) > GreeterWindow.MAX_SESSION_NAME_LENGTH:
|
||||||
|
return None
|
||||||
|
return name
|
||||||
|
|||||||
@@ -22,14 +22,16 @@ def _recvall(sock: Any, n: int) -> bytes:
|
|||||||
def send_message(sock: Any, msg: dict) -> None:
|
def send_message(sock: Any, msg: dict) -> None:
|
||||||
"""Send a length-prefixed JSON message to the greetd socket."""
|
"""Send a length-prefixed JSON message to the greetd socket."""
|
||||||
payload = json.dumps(msg).encode("utf-8")
|
payload = json.dumps(msg).encode("utf-8")
|
||||||
header = struct.pack("!I", len(payload))
|
if len(payload) > MAX_PAYLOAD_SIZE:
|
||||||
|
raise ValueError(f"Payload too large: {len(payload)} bytes (max {MAX_PAYLOAD_SIZE})")
|
||||||
|
header = struct.pack("=I", len(payload))
|
||||||
sock.sendall(header + payload)
|
sock.sendall(header + payload)
|
||||||
|
|
||||||
|
|
||||||
def recv_message(sock: Any) -> dict:
|
def recv_message(sock: Any) -> dict:
|
||||||
"""Receive a length-prefixed JSON message from the greetd socket."""
|
"""Receive a length-prefixed JSON message from the greetd socket."""
|
||||||
header = _recvall(sock, 4)
|
header = _recvall(sock, 4)
|
||||||
length = struct.unpack("!I", header)[0]
|
length = struct.unpack("=I", header)[0]
|
||||||
|
|
||||||
if length > MAX_PAYLOAD_SIZE:
|
if length > MAX_PAYLOAD_SIZE:
|
||||||
raise ConnectionError(f"Payload too large: {length} bytes (max {MAX_PAYLOAD_SIZE})")
|
raise ConnectionError(f"Payload too large: {length} bytes (max {MAX_PAYLOAD_SIZE})")
|
||||||
|
|||||||
+44
-8
@@ -4,6 +4,7 @@
|
|||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
from importlib.resources import files
|
from importlib.resources import files
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import gi
|
import gi
|
||||||
gi.require_version("Gtk", "4.0")
|
gi.require_version("Gtk", "4.0")
|
||||||
@@ -21,9 +22,38 @@ try:
|
|||||||
except (ValueError, ImportError):
|
except (ValueError, ImportError):
|
||||||
HAS_LAYER_SHELL = False
|
HAS_LAYER_SHELL = False
|
||||||
|
|
||||||
|
LOG_DIR = Path("/var/cache/moongreet")
|
||||||
|
LOG_FILE = LOG_DIR / "moongreet.log"
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_logging() -> None:
|
||||||
|
"""Configure logging to file and stderr."""
|
||||||
|
root = logging.getLogger()
|
||||||
|
root.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
formatter = logging.Formatter(
|
||||||
|
"%(asctime)s %(levelname)s %(name)s: %(message)s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Always log to stderr
|
||||||
|
stderr_handler = logging.StreamHandler(sys.stderr)
|
||||||
|
stderr_handler.setLevel(logging.INFO)
|
||||||
|
stderr_handler.setFormatter(formatter)
|
||||||
|
root.addHandler(stderr_handler)
|
||||||
|
|
||||||
|
# Log to file if the directory is writable
|
||||||
|
if LOG_DIR.is_dir():
|
||||||
|
try:
|
||||||
|
file_handler = logging.FileHandler(LOG_FILE)
|
||||||
|
file_handler.setLevel(logging.INFO)
|
||||||
|
file_handler.setFormatter(formatter)
|
||||||
|
root.addHandler(file_handler)
|
||||||
|
except PermissionError:
|
||||||
|
logger.warning("Cannot write to %s", LOG_FILE)
|
||||||
|
|
||||||
|
|
||||||
class MoongreetApp(Gtk.Application):
|
class MoongreetApp(Gtk.Application):
|
||||||
"""GTK Application for the Moongreet greeter."""
|
"""GTK Application for the Moongreet greeter."""
|
||||||
|
|
||||||
@@ -34,14 +64,18 @@ class MoongreetApp(Gtk.Application):
|
|||||||
|
|
||||||
def do_activate(self) -> None:
|
def do_activate(self) -> None:
|
||||||
"""Create and present greeter windows on all monitors."""
|
"""Create and present greeter windows on all monitors."""
|
||||||
self._register_icons()
|
display = Gdk.Display.get_default()
|
||||||
self._load_css()
|
if display is None:
|
||||||
|
logger.error("No display available — cannot start greeter UI")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._register_icons(display)
|
||||||
|
self._load_css(display)
|
||||||
|
|
||||||
# Resolve wallpaper once, share across all windows
|
# Resolve wallpaper once, share across all windows
|
||||||
config = load_config()
|
config = load_config()
|
||||||
bg_path, self._wallpaper_ctx = resolve_wallpaper_path(config)
|
bg_path, self._wallpaper_ctx = resolve_wallpaper_path(config)
|
||||||
|
|
||||||
display = Gdk.Display.get_default()
|
|
||||||
monitors = display.get_monitors()
|
monitors = display.get_monitors()
|
||||||
primary_monitor = None
|
primary_monitor = None
|
||||||
|
|
||||||
@@ -55,7 +89,7 @@ class MoongreetApp(Gtk.Application):
|
|||||||
primary_monitor = monitors.get_item(0)
|
primary_monitor = monitors.get_item(0)
|
||||||
|
|
||||||
# Main greeter window (login UI) on primary monitor
|
# Main greeter window (login UI) on primary monitor
|
||||||
greeter = GreeterWindow(bg_path=bg_path, application=self)
|
greeter = GreeterWindow(bg_path=bg_path, config=config, application=self)
|
||||||
if HAS_LAYER_SHELL:
|
if HAS_LAYER_SHELL:
|
||||||
self._setup_layer_shell(greeter, keyboard=True)
|
self._setup_layer_shell(greeter, keyboard=True)
|
||||||
if primary_monitor is not None:
|
if primary_monitor is not None:
|
||||||
@@ -81,19 +115,19 @@ class MoongreetApp(Gtk.Application):
|
|||||||
self._wallpaper_ctx = None
|
self._wallpaper_ctx = None
|
||||||
Gtk.Application.do_shutdown(self)
|
Gtk.Application.do_shutdown(self)
|
||||||
|
|
||||||
def _register_icons(self) -> None:
|
def _register_icons(self, display: Gdk.Display) -> None:
|
||||||
"""Register custom icons from the package data/icons directory."""
|
"""Register custom icons from the package data/icons directory."""
|
||||||
icons_dir = files("moongreet") / "data" / "icons"
|
icons_dir = files("moongreet") / "data" / "icons"
|
||||||
icon_theme = Gtk.IconTheme.get_for_display(Gdk.Display.get_default())
|
icon_theme = Gtk.IconTheme.get_for_display(display)
|
||||||
icon_theme.add_search_path(str(icons_dir))
|
icon_theme.add_search_path(str(icons_dir))
|
||||||
|
|
||||||
def _load_css(self) -> None:
|
def _load_css(self, display: Gdk.Display) -> None:
|
||||||
"""Load the CSS stylesheet for the greeter."""
|
"""Load the CSS stylesheet for the greeter."""
|
||||||
css_provider = Gtk.CssProvider()
|
css_provider = Gtk.CssProvider()
|
||||||
css_path = files("moongreet") / "style.css"
|
css_path = files("moongreet") / "style.css"
|
||||||
css_provider.load_from_path(str(css_path))
|
css_provider.load_from_path(str(css_path))
|
||||||
Gtk.StyleContext.add_provider_for_display(
|
Gtk.StyleContext.add_provider_for_display(
|
||||||
Gdk.Display.get_default(),
|
display,
|
||||||
css_provider,
|
css_provider,
|
||||||
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
|
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
|
||||||
)
|
)
|
||||||
@@ -118,6 +152,8 @@ class MoongreetApp(Gtk.Application):
|
|||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
"""Run the Moongreet application."""
|
"""Run the Moongreet application."""
|
||||||
|
_setup_logging()
|
||||||
|
logger.info("Moongreet starting")
|
||||||
app = MoongreetApp()
|
app = MoongreetApp()
|
||||||
app.run(sys.argv)
|
app.run(sys.argv)
|
||||||
|
|
||||||
|
|||||||
@@ -2,12 +2,9 @@
|
|||||||
# ABOUTME: Provides User dataclass and helper functions for the greeter UI.
|
# ABOUTME: Provides User dataclass and helper functions for the greeter UI.
|
||||||
|
|
||||||
import configparser
|
import configparser
|
||||||
import re
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
VALID_THEME_NAME = re.compile(r"^[A-Za-z0-9_-]+$")
|
|
||||||
|
|
||||||
NOLOGIN_SHELLS = {"/usr/sbin/nologin", "/sbin/nologin", "/bin/false", "/usr/bin/nologin"}
|
NOLOGIN_SHELLS = {"/usr/sbin/nologin", "/sbin/nologin", "/bin/false", "/usr/bin/nologin"}
|
||||||
MIN_UID = 1000
|
MIN_UID = 1000
|
||||||
MAX_UID = 65533
|
MAX_UID = 65533
|
||||||
@@ -106,8 +103,7 @@ def get_user_gtk_theme(config_dir: Path | None = None) -> str | None:
|
|||||||
|
|
||||||
if config.has_option("Settings", "gtk-theme-name"):
|
if config.has_option("Settings", "gtk-theme-name"):
|
||||||
theme = config.get("Settings", "gtk-theme-name")
|
theme = config.get("Settings", "gtk-theme-name")
|
||||||
# Validate against path traversal — only allow safe theme names
|
if theme:
|
||||||
if theme and VALID_THEME_NAME.match(theme):
|
|
||||||
return theme
|
return theme
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -43,6 +43,26 @@ class TestLoadConfig:
|
|||||||
|
|
||||||
assert config.background is None
|
assert config.background is None
|
||||||
|
|
||||||
|
def test_loads_gtk_theme(self, tmp_path: Path) -> None:
|
||||||
|
toml_file = tmp_path / "moongreet.toml"
|
||||||
|
toml_file.write_text(
|
||||||
|
"[appearance]\n"
|
||||||
|
'gtk-theme = "Catppuccin-Mocha-Standard-Blue-Dark"\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
config = load_config(toml_file)
|
||||||
|
|
||||||
|
assert config.gtk_theme == "Catppuccin-Mocha-Standard-Blue-Dark"
|
||||||
|
|
||||||
|
def test_returns_none_gtk_theme_when_missing(self, tmp_path: Path) -> None:
|
||||||
|
toml_file = tmp_path / "moongreet.toml"
|
||||||
|
toml_file.write_text("[appearance]\n")
|
||||||
|
|
||||||
|
config = load_config(toml_file)
|
||||||
|
|
||||||
|
assert config.gtk_theme is None
|
||||||
|
|
||||||
|
|
||||||
def test_resolves_relative_path_against_config_dir(self, tmp_path: Path) -> None:
|
def test_resolves_relative_path_against_config_dir(self, tmp_path: Path) -> None:
|
||||||
toml_file = tmp_path / "moongreet.toml"
|
toml_file = tmp_path / "moongreet.toml"
|
||||||
toml_file.write_text(
|
toml_file.write_text(
|
||||||
|
|||||||
+217
-5
@@ -10,7 +10,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from moongreet.greeter import faillock_warning, FAILLOCK_MAX_ATTEMPTS
|
from moongreet.greeter import faillock_warning, FAILLOCK_MAX_ATTEMPTS, LAST_SESSION_DIR
|
||||||
from moongreet.i18n import load_strings
|
from moongreet.i18n import load_strings
|
||||||
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
|
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
|
||||||
|
|
||||||
@@ -59,14 +59,14 @@ class MockGreetd:
|
|||||||
header = self._recvall(conn, 4)
|
header = self._recvall(conn, 4)
|
||||||
if len(header) < 4:
|
if len(header) < 4:
|
||||||
break
|
break
|
||||||
length = struct.unpack("!I", header)[0]
|
length = struct.unpack("=I", header)[0]
|
||||||
payload = self._recvall(conn, length)
|
payload = self._recvall(conn, length)
|
||||||
msg = json.loads(payload.decode("utf-8"))
|
msg = json.loads(payload.decode("utf-8"))
|
||||||
self._received.append(msg)
|
self._received.append(msg)
|
||||||
|
|
||||||
# Send response
|
# Send response
|
||||||
resp_payload = json.dumps(response).encode("utf-8")
|
resp_payload = json.dumps(response).encode("utf-8")
|
||||||
conn.sendall(struct.pack("!I", len(resp_payload)) + resp_payload)
|
conn.sendall(struct.pack("=I", len(resp_payload)) + resp_payload)
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@@ -112,12 +112,13 @@ class TestLoginFlow:
|
|||||||
assert mock.received[1] == {"type": "post_auth_message_response", "response": "geheim"}
|
assert mock.received[1] == {"type": "post_auth_message_response", "response": "geheim"}
|
||||||
assert mock.received[2] == {"type": "start_session", "cmd": ["Hyprland"]}
|
assert mock.received[2] == {"type": "start_session", "cmd": ["Hyprland"]}
|
||||||
|
|
||||||
def test_wrong_password(self, tmp_path: Path) -> None:
|
def test_wrong_password_sends_cancel(self, tmp_path: Path) -> None:
|
||||||
"""Simulate a failed login due to wrong password."""
|
"""After a failed login, cancel_session must be sent to free the greetd session."""
|
||||||
sock_path = tmp_path / "greetd.sock"
|
sock_path = tmp_path / "greetd.sock"
|
||||||
mock = MockGreetd(sock_path)
|
mock = MockGreetd(sock_path)
|
||||||
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
|
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
|
||||||
mock.expect({"type": "error", "error_type": "auth_error", "description": "Authentication failed"})
|
mock.expect({"type": "error", "error_type": "auth_error", "description": "Authentication failed"})
|
||||||
|
mock.expect({"type": "success"}) # Response to cancel_session
|
||||||
mock.start()
|
mock.start()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -131,10 +132,64 @@ class TestLoginFlow:
|
|||||||
assert response["type"] == "error"
|
assert response["type"] == "error"
|
||||||
assert response["description"] == "Authentication failed"
|
assert response["description"] == "Authentication failed"
|
||||||
|
|
||||||
|
# The greeter must cancel the session after auth failure
|
||||||
|
response = cancel_session(sock)
|
||||||
|
assert response["type"] == "success"
|
||||||
|
|
||||||
sock.close()
|
sock.close()
|
||||||
finally:
|
finally:
|
||||||
mock.stop()
|
mock.stop()
|
||||||
|
|
||||||
|
assert mock.received[2] == {"type": "cancel_session"}
|
||||||
|
|
||||||
|
def test_stale_session_cancel_and_retry(self, tmp_path: Path) -> None:
|
||||||
|
"""When create_session fails due to a stale session, cancel and retry."""
|
||||||
|
sock_path = tmp_path / "greetd.sock"
|
||||||
|
mock = MockGreetd(sock_path)
|
||||||
|
# First create_session → error (stale session)
|
||||||
|
mock.expect({"type": "error", "error_type": "error", "description": "a session is already being configured"})
|
||||||
|
# cancel_session → success
|
||||||
|
mock.expect({"type": "success"})
|
||||||
|
# Second create_session → auth_message (retry succeeds)
|
||||||
|
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
|
||||||
|
# post_auth_response → success
|
||||||
|
mock.expect({"type": "success"})
|
||||||
|
# start_session → success
|
||||||
|
mock.expect({"type": "success"})
|
||||||
|
mock.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
sock.connect(str(sock_path))
|
||||||
|
|
||||||
|
# Step 1: Create session fails
|
||||||
|
response = create_session(sock, "dominik")
|
||||||
|
assert response["type"] == "error"
|
||||||
|
|
||||||
|
# Step 2: Cancel stale session
|
||||||
|
response = cancel_session(sock)
|
||||||
|
assert response["type"] == "success"
|
||||||
|
|
||||||
|
# Step 3: Retry create session
|
||||||
|
response = create_session(sock, "dominik")
|
||||||
|
assert response["type"] == "auth_message"
|
||||||
|
|
||||||
|
# Step 4: Send password
|
||||||
|
response = post_auth_response(sock, "geheim")
|
||||||
|
assert response["type"] == "success"
|
||||||
|
|
||||||
|
# Step 5: Start session
|
||||||
|
response = start_session(sock, ["niri-session"])
|
||||||
|
assert response["type"] == "success"
|
||||||
|
|
||||||
|
sock.close()
|
||||||
|
finally:
|
||||||
|
mock.stop()
|
||||||
|
|
||||||
|
assert mock.received[0] == {"type": "create_session", "username": "dominik"}
|
||||||
|
assert mock.received[1] == {"type": "cancel_session"}
|
||||||
|
assert mock.received[2] == {"type": "create_session", "username": "dominik"}
|
||||||
|
|
||||||
def test_multi_stage_auth_sends_cancel(self, tmp_path: Path) -> None:
|
def test_multi_stage_auth_sends_cancel(self, tmp_path: Path) -> None:
|
||||||
"""When greetd sends a second auth_message after password, cancel the session."""
|
"""When greetd sends a second auth_message after password, cancel the session."""
|
||||||
sock_path = tmp_path / "greetd.sock"
|
sock_path = tmp_path / "greetd.sock"
|
||||||
@@ -190,6 +245,101 @@ class TestLoginFlow:
|
|||||||
assert mock.received[1] == {"type": "cancel_session"}
|
assert mock.received[1] == {"type": "cancel_session"}
|
||||||
|
|
||||||
|
|
||||||
|
class TestSessionCancellation:
|
||||||
|
"""Tests for cancelling an in-progress greetd session during user switch."""
|
||||||
|
|
||||||
|
def test_cancel_closes_socket_and_sets_event(self, tmp_path: Path) -> None:
|
||||||
|
"""_cancel_pending_session should close the socket and set the cancelled event."""
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
|
||||||
|
win = GreeterWindow.__new__(GreeterWindow)
|
||||||
|
win._greetd_sock_lock = threading.Lock()
|
||||||
|
win._login_cancelled = threading.Event()
|
||||||
|
|
||||||
|
# Create a real socket pair to verify close
|
||||||
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
sock_path = tmp_path / "test.sock"
|
||||||
|
server.bind(str(sock_path))
|
||||||
|
server.listen(1)
|
||||||
|
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
client.connect(str(sock_path))
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
win._greetd_sock = client
|
||||||
|
win._cancel_pending_session()
|
||||||
|
|
||||||
|
assert win._login_cancelled.is_set()
|
||||||
|
assert win._greetd_sock is None
|
||||||
|
|
||||||
|
def test_cancel_is_noop_without_socket(self) -> None:
|
||||||
|
"""_cancel_pending_session should be safe to call when no socket exists."""
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
|
||||||
|
win = GreeterWindow.__new__(GreeterWindow)
|
||||||
|
win._greetd_sock_lock = threading.Lock()
|
||||||
|
win._login_cancelled = threading.Event()
|
||||||
|
win._greetd_sock = None
|
||||||
|
|
||||||
|
win._cancel_pending_session()
|
||||||
|
|
||||||
|
assert win._login_cancelled.is_set()
|
||||||
|
assert win._greetd_sock is None
|
||||||
|
|
||||||
|
def test_cancel_does_not_block_main_thread(self, tmp_path: Path) -> None:
|
||||||
|
"""_cancel_pending_session must not do blocking I/O — only close the socket."""
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
|
||||||
|
win = GreeterWindow.__new__(GreeterWindow)
|
||||||
|
win._greetd_sock_lock = threading.Lock()
|
||||||
|
win._login_cancelled = threading.Event()
|
||||||
|
|
||||||
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
win._greetd_sock = sock
|
||||||
|
|
||||||
|
# Should complete nearly instantly (no IPC calls)
|
||||||
|
import time
|
||||||
|
start = time.monotonic()
|
||||||
|
win._cancel_pending_session()
|
||||||
|
elapsed = time.monotonic() - start
|
||||||
|
|
||||||
|
assert elapsed < 0.1 # No blocking I/O
|
||||||
|
|
||||||
|
def test_worker_exits_silently_when_cancelled(self, tmp_path: Path) -> None:
|
||||||
|
"""_login_worker should exit without showing an error when cancelled mid-flight."""
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
from moongreet.users import User
|
||||||
|
|
||||||
|
win = GreeterWindow.__new__(GreeterWindow)
|
||||||
|
win._greetd_sock_lock = threading.Lock()
|
||||||
|
win._login_cancelled = threading.Event()
|
||||||
|
win._greetd_sock = None
|
||||||
|
win._failed_attempts = {}
|
||||||
|
win._strings = MagicMock()
|
||||||
|
|
||||||
|
# Set cancelled before the worker runs
|
||||||
|
win._login_cancelled.set()
|
||||||
|
|
||||||
|
# Create a socket that will fail (simulating closed socket)
|
||||||
|
sock_path = tmp_path / "greetd.sock"
|
||||||
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
server.bind(str(sock_path))
|
||||||
|
server.listen(1)
|
||||||
|
|
||||||
|
user = User(username="dom", uid=1000, gecos="Dominik", home=Path("/home/dom"), shell="/bin/zsh")
|
||||||
|
|
||||||
|
with patch("moongreet.greeter.GLib.idle_add") as mock_idle:
|
||||||
|
win._login_worker(user, "pw", MagicMock(exec_cmd="niri-session"), str(sock_path))
|
||||||
|
|
||||||
|
# Should NOT have scheduled any error callback
|
||||||
|
for call in mock_idle.call_args_list:
|
||||||
|
func = call[0][0]
|
||||||
|
assert func != win._on_login_error, "Worker should not show error when cancelled"
|
||||||
|
assert func != win._on_login_auth_error, "Worker should not show auth error when cancelled"
|
||||||
|
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
|
||||||
class TestFaillockWarning:
|
class TestFaillockWarning:
|
||||||
"""Tests for the faillock warning message logic."""
|
"""Tests for the faillock warning message logic."""
|
||||||
|
|
||||||
@@ -264,3 +414,65 @@ class TestLastUser:
|
|||||||
from moongreet.greeter import GreeterWindow
|
from moongreet.greeter import GreeterWindow
|
||||||
result = GreeterWindow._load_last_user()
|
result = GreeterWindow._load_last_user()
|
||||||
assert result is None
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestLastSession:
|
||||||
|
"""Tests for saving and loading the last session per user."""
|
||||||
|
|
||||||
|
def test_save_and_load_last_session(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
|
||||||
|
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
GreeterWindow._save_last_session("dominik", "Niri")
|
||||||
|
|
||||||
|
session_file = tmp_path / "dominik"
|
||||||
|
assert session_file.exists()
|
||||||
|
assert session_file.read_text() == "Niri"
|
||||||
|
|
||||||
|
result = GreeterWindow._load_last_session("dominik")
|
||||||
|
assert result == "Niri"
|
||||||
|
|
||||||
|
def test_load_last_session_missing_file(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
|
||||||
|
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
result = GreeterWindow._load_last_session("nobody")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_load_last_session_rejects_oversized_name(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
|
||||||
|
(tmp_path / "dominik").write_text("A" * 300)
|
||||||
|
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
result = GreeterWindow._load_last_session("dominik")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_save_last_session_validates_username(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
"""Usernames with path traversal should not create files outside the cache dir."""
|
||||||
|
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
|
||||||
|
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
GreeterWindow._save_last_session("../../etc/evil", "Niri")
|
||||||
|
|
||||||
|
# Should not have created any file
|
||||||
|
assert not (tmp_path / "../../etc/evil").exists()
|
||||||
|
|
||||||
|
def test_regex_rejects_dot_dot_username(self) -> None:
|
||||||
|
"""Username '..' must not pass VALID_USERNAME validation."""
|
||||||
|
from moongreet.greeter import VALID_USERNAME
|
||||||
|
assert VALID_USERNAME.match("..") is None
|
||||||
|
|
||||||
|
def test_regex_rejects_dot_username(self) -> None:
|
||||||
|
"""Username '.' must not pass VALID_USERNAME validation."""
|
||||||
|
from moongreet.greeter import VALID_USERNAME
|
||||||
|
assert VALID_USERNAME.match(".") is None
|
||||||
|
|
||||||
|
def test_regex_allows_dot_in_middle(self) -> None:
|
||||||
|
"""Usernames like 'first.last' must still be valid."""
|
||||||
|
from moongreet.greeter import VALID_USERNAME
|
||||||
|
assert VALID_USERNAME.match("first.last") is not None
|
||||||
|
|
||||||
|
def test_regex_rejects_leading_dot(self) -> None:
|
||||||
|
"""Usernames starting with '.' are rejected (hidden files)."""
|
||||||
|
from moongreet.greeter import VALID_USERNAME
|
||||||
|
assert VALID_USERNAME.match(".hidden") is None
|
||||||
|
|||||||
+20
-12
@@ -38,7 +38,7 @@ class FakeSocket:
|
|||||||
def with_response(cls, response: dict) -> "FakeSocket":
|
def with_response(cls, response: dict) -> "FakeSocket":
|
||||||
"""Create a FakeSocket pre-loaded with a length-prefixed JSON response."""
|
"""Create a FakeSocket pre-loaded with a length-prefixed JSON response."""
|
||||||
payload = json.dumps(response).encode("utf-8")
|
payload = json.dumps(response).encode("utf-8")
|
||||||
data = struct.pack("!I", len(payload)) + payload
|
data = struct.pack("=I", len(payload)) + payload
|
||||||
return cls(recv_data=data)
|
return cls(recv_data=data)
|
||||||
|
|
||||||
|
|
||||||
@@ -73,7 +73,7 @@ class TestSendMessage:
|
|||||||
send_message(sock, msg)
|
send_message(sock, msg)
|
||||||
|
|
||||||
payload = json.dumps(msg).encode("utf-8")
|
payload = json.dumps(msg).encode("utf-8")
|
||||||
expected = struct.pack("!I", len(payload)) + payload
|
expected = struct.pack("=I", len(payload)) + payload
|
||||||
assert bytes(sock.sent) == expected
|
assert bytes(sock.sent) == expected
|
||||||
|
|
||||||
def test_sends_empty_dict(self) -> None:
|
def test_sends_empty_dict(self) -> None:
|
||||||
@@ -82,7 +82,7 @@ class TestSendMessage:
|
|||||||
send_message(sock, {})
|
send_message(sock, {})
|
||||||
|
|
||||||
payload = json.dumps({}).encode("utf-8")
|
payload = json.dumps({}).encode("utf-8")
|
||||||
expected = struct.pack("!I", len(payload)) + payload
|
expected = struct.pack("=I", len(payload)) + payload
|
||||||
assert bytes(sock.sent) == expected
|
assert bytes(sock.sent) == expected
|
||||||
|
|
||||||
def test_sends_nested_message(self) -> None:
|
def test_sends_nested_message(self) -> None:
|
||||||
@@ -93,12 +93,20 @@ class TestSendMessage:
|
|||||||
|
|
||||||
# Verify the payload is correctly length-prefixed
|
# Verify the payload is correctly length-prefixed
|
||||||
length_bytes = bytes(sock.sent[:4])
|
length_bytes = bytes(sock.sent[:4])
|
||||||
length = struct.unpack("!I", length_bytes)[0]
|
length = struct.unpack("=I", length_bytes)[0]
|
||||||
decoded = json.loads(sock.sent[4:])
|
decoded = json.loads(sock.sent[4:])
|
||||||
assert length == len(json.dumps(msg).encode("utf-8"))
|
assert length == len(json.dumps(msg).encode("utf-8"))
|
||||||
assert decoded == msg
|
assert decoded == msg
|
||||||
|
|
||||||
|
|
||||||
|
def test_rejects_oversized_payload(self) -> None:
|
||||||
|
sock = FakeSocket()
|
||||||
|
msg = {"type": "huge", "data": "x" * 100000}
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Payload too large"):
|
||||||
|
send_message(sock, msg)
|
||||||
|
|
||||||
|
|
||||||
class TestRecvMessage:
|
class TestRecvMessage:
|
||||||
"""Tests for receiving and decoding length-prefixed JSON messages."""
|
"""Tests for receiving and decoding length-prefixed JSON messages."""
|
||||||
|
|
||||||
@@ -132,7 +140,7 @@ class TestRecvMessage:
|
|||||||
"""recv() may return fewer bytes than requested — must loop."""
|
"""recv() may return fewer bytes than requested — must loop."""
|
||||||
response = {"type": "success"}
|
response = {"type": "success"}
|
||||||
payload = json.dumps(response).encode("utf-8")
|
payload = json.dumps(response).encode("utf-8")
|
||||||
data = struct.pack("!I", len(payload)) + payload
|
data = struct.pack("=I", len(payload)) + payload
|
||||||
sock = FragmentingSocket(data, chunk_size=3)
|
sock = FragmentingSocket(data, chunk_size=3)
|
||||||
|
|
||||||
result = recv_message(sock)
|
result = recv_message(sock)
|
||||||
@@ -141,7 +149,7 @@ class TestRecvMessage:
|
|||||||
|
|
||||||
def test_rejects_oversized_payload(self) -> None:
|
def test_rejects_oversized_payload(self) -> None:
|
||||||
"""Payloads exceeding the size limit must be rejected."""
|
"""Payloads exceeding the size limit must be rejected."""
|
||||||
header = struct.pack("!I", 10_000_000)
|
header = struct.pack("=I", 10_000_000)
|
||||||
sock = FakeSocket(recv_data=header)
|
sock = FakeSocket(recv_data=header)
|
||||||
|
|
||||||
with pytest.raises(ConnectionError, match="too large"):
|
with pytest.raises(ConnectionError, match="too large"):
|
||||||
@@ -162,7 +170,7 @@ class TestCreateSession:
|
|||||||
result = create_session(sock, "dominik")
|
result = create_session(sock, "dominik")
|
||||||
|
|
||||||
# Verify sent message
|
# Verify sent message
|
||||||
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
|
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
|
||||||
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
||||||
assert sent_msg == {"type": "create_session", "username": "dominik"}
|
assert sent_msg == {"type": "create_session", "username": "dominik"}
|
||||||
assert result == response
|
assert result == response
|
||||||
@@ -177,7 +185,7 @@ class TestPostAuthResponse:
|
|||||||
|
|
||||||
result = post_auth_response(sock, "mypassword")
|
result = post_auth_response(sock, "mypassword")
|
||||||
|
|
||||||
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
|
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
|
||||||
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
||||||
assert sent_msg == {
|
assert sent_msg == {
|
||||||
"type": "post_auth_message_response",
|
"type": "post_auth_message_response",
|
||||||
@@ -192,7 +200,7 @@ class TestPostAuthResponse:
|
|||||||
|
|
||||||
result = post_auth_response(sock, None)
|
result = post_auth_response(sock, None)
|
||||||
|
|
||||||
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
|
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
|
||||||
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
||||||
assert sent_msg == {
|
assert sent_msg == {
|
||||||
"type": "post_auth_message_response",
|
"type": "post_auth_message_response",
|
||||||
@@ -209,7 +217,7 @@ class TestStartSession:
|
|||||||
|
|
||||||
result = start_session(sock, ["Hyprland"])
|
result = start_session(sock, ["Hyprland"])
|
||||||
|
|
||||||
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
|
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
|
||||||
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
||||||
assert sent_msg == {"type": "start_session", "cmd": ["Hyprland"]}
|
assert sent_msg == {"type": "start_session", "cmd": ["Hyprland"]}
|
||||||
assert result == response
|
assert result == response
|
||||||
@@ -220,7 +228,7 @@ class TestStartSession:
|
|||||||
|
|
||||||
result = start_session(sock, ["sway", "--config", "/etc/sway/config"])
|
result = start_session(sock, ["sway", "--config", "/etc/sway/config"])
|
||||||
|
|
||||||
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
|
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
|
||||||
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
||||||
assert sent_msg == {
|
assert sent_msg == {
|
||||||
"type": "start_session",
|
"type": "start_session",
|
||||||
@@ -237,7 +245,7 @@ class TestCancelSession:
|
|||||||
|
|
||||||
result = cancel_session(sock)
|
result = cancel_session(sock)
|
||||||
|
|
||||||
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
|
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
|
||||||
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
sent_msg = json.loads(sock.sent[4 : 4 + length])
|
||||||
assert sent_msg == {"type": "cancel_session"}
|
assert sent_msg == {"type": "cancel_session"}
|
||||||
assert result == response
|
assert result == response
|
||||||
|
|||||||
@@ -27,6 +27,13 @@ class TestReboot:
|
|||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(subprocess.CalledProcessError):
|
||||||
reboot()
|
reboot()
|
||||||
|
|
||||||
|
@patch("moongreet.power.subprocess.run")
|
||||||
|
def test_raises_on_timeout(self, mock_run) -> None:
|
||||||
|
mock_run.side_effect = subprocess.TimeoutExpired("loginctl", POWER_TIMEOUT)
|
||||||
|
|
||||||
|
with pytest.raises(subprocess.TimeoutExpired):
|
||||||
|
reboot()
|
||||||
|
|
||||||
|
|
||||||
class TestShutdown:
|
class TestShutdown:
|
||||||
"""Tests for the shutdown power action."""
|
"""Tests for the shutdown power action."""
|
||||||
@@ -45,3 +52,10 @@ class TestShutdown:
|
|||||||
|
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(subprocess.CalledProcessError):
|
||||||
shutdown()
|
shutdown()
|
||||||
|
|
||||||
|
@patch("moongreet.power.subprocess.run")
|
||||||
|
def test_raises_on_timeout(self, mock_run) -> None:
|
||||||
|
mock_run.side_effect = subprocess.TimeoutExpired("loginctl", POWER_TIMEOUT)
|
||||||
|
|
||||||
|
with pytest.raises(subprocess.TimeoutExpired):
|
||||||
|
shutdown()
|
||||||
|
|||||||
+6
-15
@@ -186,27 +186,18 @@ class TestGetUserGtkTheme:
|
|||||||
|
|
||||||
assert result is None
|
assert result is None
|
||||||
|
|
||||||
def test_handles_interpolation_characters(self, tmp_path: Path) -> None:
|
def test_passes_theme_with_special_characters(self, tmp_path: Path) -> None:
|
||||||
"""Theme names with % characters are rejected by validation."""
|
"""Theme names with special characters are passed through to GTK."""
|
||||||
gtk_dir = tmp_path / ".config" / "gtk-4.0"
|
gtk_dir = tmp_path / ".config" / "gtk-4.0"
|
||||||
gtk_dir.mkdir(parents=True)
|
gtk_dir.mkdir(parents=True)
|
||||||
settings = gtk_dir / "settings.ini"
|
settings = gtk_dir / "settings.ini"
|
||||||
settings.write_text("[Settings]\ngtk-theme-name=My%Theme\n")
|
settings.write_text(
|
||||||
|
"[Settings]\ngtk-theme-name=catppuccin-mocha-lavender-standard+default\n"
|
||||||
|
)
|
||||||
|
|
||||||
result = get_user_gtk_theme(config_dir=gtk_dir)
|
result = get_user_gtk_theme(config_dir=gtk_dir)
|
||||||
|
|
||||||
assert result is None
|
assert result == "catppuccin-mocha-lavender-standard+default"
|
||||||
|
|
||||||
def test_rejects_path_traversal_theme_name(self, tmp_path: Path) -> None:
|
|
||||||
"""Theme names with path traversal characters should be rejected."""
|
|
||||||
gtk_dir = tmp_path / ".config" / "gtk-4.0"
|
|
||||||
gtk_dir.mkdir(parents=True)
|
|
||||||
settings = gtk_dir / "settings.ini"
|
|
||||||
settings.write_text("[Settings]\ngtk-theme-name=../../../../etc/evil\n")
|
|
||||||
|
|
||||||
result = get_user_gtk_theme(config_dir=gtk_dir)
|
|
||||||
|
|
||||||
assert result is None
|
|
||||||
|
|
||||||
def test_ignores_symlinked_accountsservice_icon(self, tmp_path: Path) -> None:
|
def test_ignores_symlinked_accountsservice_icon(self, tmp_path: Path) -> None:
|
||||||
"""AccountsService icon as symlink should be ignored to prevent traversal."""
|
"""AccountsService icon as symlink should be ignored to prevent traversal."""
|
||||||
|
|||||||
Reference in New Issue
Block a user