nevaforget 64f08d7e8b Harden greeter against threading issues, path traversal, and edge cases
Security:
- Fix path traversal in _save/_load_last_session by rejecting usernames
  starting with dot (blocks '..' and hidden file creation)
- Add avatar file size limit (10 MB) to prevent DoS via large ~/.face
- Add session_name length validation on write (symmetric with read)
- Add payload size check to send_message (symmetric with recv_message)
- Set log level to INFO in production (was DEBUG)

Quality:
- Eliminate main-thread blocking on user switch: _cancel_pending_session
  now sets a cancellation event and closes the socket instead of doing
  blocking IPC. The login worker checks the event after each step.
- Move power actions (reboot/shutdown) to background threads
- Catch TimeoutExpired in addition to CalledProcessError for power actions
- Consolidate socket cleanup in _login_worker via finally block, remove
  redundant _close_greetd_sock calls from error callbacks
- Fix _select_initial_user to return False for GLib.idle_add deregistration
- Fix context manager leak in resolve_wallpaper_path on exception
- Pass Config object to GreeterWindow instead of loading it twice
2026-03-26 16:25:13 +01:00

665 lines
25 KiB
Python

# ABOUTME: Main greeter window — builds the GTK4 UI layout for the Moongreet greeter.
# ABOUTME: Handles user selection, session choice, password entry, and power actions.
import logging
import os
import re
import shlex
import shutil
import socket
import stat
import subprocess
import threading
from importlib.resources import files
from pathlib import Path
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Gdk", "4.0")
from gi.repository import Gtk, Gdk, GLib, GdkPixbuf
from moongreet.config import Config, load_config, resolve_wallpaper_path
from moongreet.i18n import load_strings, Strings
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
from moongreet.users import User, get_users, get_avatar_path
from moongreet.sessions import Session, get_sessions
from moongreet.power import reboot, shutdown
logger = logging.getLogger(__name__)
LAST_USER_PATH = Path("/var/cache/moongreet/last-user")
LAST_SESSION_DIR = Path("/var/cache/moongreet/last-session")
FAILLOCK_MAX_ATTEMPTS = 3
VALID_USERNAME = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_.-]*$")
MAX_USERNAME_LENGTH = 256
PACKAGE_DATA = files("moongreet") / "data"
DEFAULT_AVATAR_PATH = PACKAGE_DATA / "default-avatar.svg"
AVATAR_SIZE = 128
MAX_AVATAR_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
def faillock_warning(attempt_count: int, strings: Strings | None = None) -> str | None:
"""Return a warning if the user is approaching or has reached the faillock limit."""
if strings is None:
strings = load_strings()
remaining = FAILLOCK_MAX_ATTEMPTS - attempt_count
if remaining <= 0:
return strings.faillock_locked
if remaining == 1:
return strings.faillock_attempts_remaining.format(n=remaining)
return None
def _build_wallpaper_widget(bg_path: Path | None) -> Gtk.Widget:
"""Create a wallpaper widget that fills the available space."""
if bg_path and bg_path.exists():
background = Gtk.Picture()
background.set_filename(str(bg_path))
background.set_content_fit(Gtk.ContentFit.COVER)
background.set_hexpand(True)
background.set_vexpand(True)
return background
background = Gtk.Box()
background.set_hexpand(True)
background.set_vexpand(True)
return background
class WallpaperWindow(Gtk.ApplicationWindow):
"""A window that shows only the wallpaper — used for secondary monitors."""
def __init__(self, bg_path: Path | None = None, **kwargs) -> None:
super().__init__(**kwargs)
self.add_css_class("greeter")
self.set_default_size(1920, 1080)
self.set_child(_build_wallpaper_widget(bg_path))
class GreeterWindow(Gtk.ApplicationWindow):
"""The main greeter window with login UI."""
def __init__(self, bg_path: Path | None = None, config: Config | None = None, **kwargs) -> None:
super().__init__(**kwargs)
self.add_css_class("greeter")
self.set_default_size(1920, 1080)
self._config = config if config is not None else load_config()
self._strings = load_strings()
self._users = get_users()
self._sessions = get_sessions()
self._selected_user: User | None = None
self._greetd_sock: socket.socket | None = None
self._greetd_sock_lock = threading.Lock()
self._login_cancelled = threading.Event()
self._default_avatar_pixbuf: GdkPixbuf.Pixbuf | None = None
self._avatar_cache: dict[str, GdkPixbuf.Pixbuf] = {}
self._failed_attempts: dict[str, int] = {}
self._bg_path = bg_path
self._apply_global_theme()
self._build_ui()
self._setup_keyboard_navigation()
# Defer initial user selection until the window is realized,
# so get_color() returns the actual theme foreground for SVG tinting
self.connect("realize", self._on_realize)
def _on_realize(self, widget: Gtk.Widget) -> None:
"""Called when the window is realized — select initial user.
Deferred from __init__ so get_color() returns actual theme values
for SVG tinting. Uses idle_add so the first frame renders before
avatar loading blocks the main loop.
"""
GLib.idle_add(self._select_initial_user)
def _build_ui(self) -> None:
"""Build the complete greeter UI layout."""
# Root overlay for layering
overlay = Gtk.Overlay()
self.set_child(overlay)
# Background wallpaper
overlay.set_child(_build_wallpaper_widget(self._bg_path))
# Main layout: 3 rows (top spacer, center login, bottom bar)
main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
main_box.set_hexpand(True)
main_box.set_vexpand(True)
overlay.add_overlay(main_box)
# Top spacer
top_spacer = Gtk.Box()
top_spacer.set_vexpand(True)
main_box.append(top_spacer)
# Center: login box
center_box = self._build_login_box()
center_box.set_halign(Gtk.Align.CENTER)
main_box.append(center_box)
# Bottom spacer
bottom_spacer = Gtk.Box()
bottom_spacer.set_vexpand(True)
main_box.append(bottom_spacer)
# Bottom bar overlay (user list left, power buttons right)
bottom_bar = self._build_bottom_bar()
bottom_bar.set_valign(Gtk.Align.END)
overlay.add_overlay(bottom_bar)
def _build_login_box(self) -> Gtk.Box:
"""Build the central login area with avatar, name, session, password."""
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
box.add_css_class("login-box")
box.set_halign(Gtk.Align.CENTER)
box.set_valign(Gtk.Align.CENTER)
box.set_spacing(12)
# Avatar — wrapped in a clipping frame for round shape
avatar_frame = Gtk.Box()
avatar_frame.set_size_request(AVATAR_SIZE, AVATAR_SIZE)
avatar_frame.set_halign(Gtk.Align.CENTER)
avatar_frame.set_overflow(Gtk.Overflow.HIDDEN)
avatar_frame.add_css_class("avatar")
self._avatar_image = Gtk.Image()
self._avatar_image.set_pixel_size(AVATAR_SIZE)
avatar_frame.append(self._avatar_image)
box.append(avatar_frame)
# Username label
self._username_label = Gtk.Label(label="")
self._username_label.add_css_class("username-label")
box.append(self._username_label)
# Session dropdown
self._session_dropdown = Gtk.DropDown()
self._session_dropdown.add_css_class("session-dropdown")
self._session_dropdown.set_hexpand(True)
if self._sessions:
session_names = [s.name for s in self._sessions]
string_list = Gtk.StringList.new(session_names)
self._session_dropdown.set_model(string_list)
box.append(self._session_dropdown)
# Password entry
self._password_entry = Gtk.PasswordEntry()
self._password_entry.set_hexpand(True)
self._password_entry.set_property("placeholder-text", self._strings.password_placeholder)
self._password_entry.set_property("show-peek-icon", True)
self._password_entry.add_css_class("password-entry")
self._password_entry.connect("activate", self._on_login_activate)
box.append(self._password_entry)
# Error label (hidden by default)
self._error_label = Gtk.Label(label="")
self._error_label.add_css_class("error-label")
self._error_label.set_visible(False)
box.append(self._error_label)
return box
def _build_bottom_bar(self) -> Gtk.Box:
"""Build the bottom bar with user list (left) and power buttons (right)."""
bar = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
bar.set_hexpand(True)
bar.set_margin_start(16)
bar.set_margin_end(16)
bar.set_margin_bottom(16)
# User list (left)
user_list_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
user_list_box.add_css_class("user-list")
user_list_box.set_halign(Gtk.Align.START)
user_list_box.set_valign(Gtk.Align.END)
for user in self._users:
btn = Gtk.Button(label=user.display_name)
btn.add_css_class("user-list-item")
btn.connect("clicked", self._on_user_clicked, user)
user_list_box.append(btn)
bar.append(user_list_box)
# Spacer
spacer = Gtk.Box()
spacer.set_hexpand(True)
bar.append(spacer)
# Power buttons (right)
power_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
power_box.set_halign(Gtk.Align.END)
power_box.set_valign(Gtk.Align.END)
power_box.set_spacing(8)
reboot_btn = Gtk.Button()
reboot_btn.set_icon_name("system-reboot-symbolic")
reboot_btn.add_css_class("power-button")
reboot_btn.set_tooltip_text(self._strings.reboot_tooltip)
reboot_btn.connect("clicked", self._on_reboot_clicked)
power_box.append(reboot_btn)
shutdown_btn = Gtk.Button()
shutdown_btn.set_icon_name("system-shutdown-symbolic")
shutdown_btn.add_css_class("power-button")
shutdown_btn.set_tooltip_text(self._strings.shutdown_tooltip)
shutdown_btn.connect("clicked", self._on_shutdown_clicked)
power_box.append(shutdown_btn)
bar.append(power_box)
return bar
def _select_initial_user(self) -> bool:
"""Select the last user or the first available user.
Returns False to deregister from GLib.idle_add after a single invocation.
"""
if not self._users:
return False
# Try to load last user
last_username = self._load_last_user()
target_user = None
if last_username:
for user in self._users:
if user.username == last_username:
target_user = user
break
if target_user is None:
target_user = self._users[0]
self._switch_to_user(target_user)
return False
def _switch_to_user(self, user: User) -> None:
"""Update the UI to show the selected user."""
self._selected_user = user
self._username_label.set_text(user.display_name)
self._password_entry.set_text("")
self._error_label.set_visible(False)
# Update avatar (use cache if available)
if user.username in self._avatar_cache:
self._avatar_image.set_from_pixbuf(self._avatar_cache[user.username])
else:
avatar_path = get_avatar_path(
user.username, home_dir=user.home
)
if avatar_path and avatar_path.exists():
self._set_avatar_from_file(avatar_path, user.username)
else:
# Default avatar — _set_default_avatar uses Traversable.read_text()
# which works in ZIP wheels too, no exists() check needed
self._set_default_avatar()
# Pre-select last used session for this user
self._select_last_session(user)
# Focus password entry
self._password_entry.grab_focus()
def _apply_global_theme(self) -> None:
"""Apply the GTK theme from moongreet.toml configuration."""
theme_name = self._config.gtk_theme
if not theme_name:
return
settings = Gtk.Settings.get_default()
if settings is None:
return
settings.set_property("gtk-theme-name", theme_name)
def _get_foreground_color(self) -> str:
"""Get the current GTK theme foreground color as a hex string."""
rgba = self.get_color()
r = int(rgba.red * 255)
g = int(rgba.green * 255)
b = int(rgba.blue * 255)
return f"#{r:02x}{g:02x}{b:02x}"
def _set_default_avatar(self) -> None:
"""Load the default avatar SVG, tinted with the GTK foreground color."""
if self._default_avatar_pixbuf:
self._avatar_image.set_from_pixbuf(self._default_avatar_pixbuf)
return
try:
svg_text = DEFAULT_AVATAR_PATH.read_text()
fg_color = self._get_foreground_color()
svg_text = svg_text.replace("#PLACEHOLDER", fg_color)
svg_bytes = svg_text.encode("utf-8")
loader = GdkPixbuf.PixbufLoader.new_with_type("svg")
loader.set_size(AVATAR_SIZE, AVATAR_SIZE)
loader.write(svg_bytes)
loader.close()
pixbuf = loader.get_pixbuf()
if pixbuf:
self._default_avatar_pixbuf = pixbuf
self._avatar_image.set_from_pixbuf(pixbuf)
except (GLib.Error, OSError):
self._avatar_image.set_from_icon_name("avatar-default-symbolic")
def _set_avatar_from_file(self, path: Path, username: str | None = None) -> None:
"""Load an image file and set it as the avatar, scaled to AVATAR_SIZE."""
try:
if path.stat().st_size > MAX_AVATAR_FILE_SIZE:
self._avatar_image.set_from_icon_name("avatar-default-symbolic")
return
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
str(path), AVATAR_SIZE, AVATAR_SIZE, True
)
if username:
self._avatar_cache[username] = pixbuf
self._avatar_image.set_from_pixbuf(pixbuf)
except GLib.Error:
self._avatar_image.set_from_icon_name("avatar-default-symbolic")
def _setup_keyboard_navigation(self) -> None:
"""Set up keyboard shortcuts."""
controller = Gtk.EventControllerKey()
controller.connect("key-pressed", self._on_key_pressed)
self.add_controller(controller)
def _on_key_pressed(
self,
controller: Gtk.EventControllerKey,
keyval: int,
keycode: int,
state: Gdk.ModifierType,
) -> bool:
"""Handle global key presses."""
if keyval == Gdk.KEY_Escape:
self._password_entry.set_text("")
self._error_label.set_visible(False)
return True
return False
def _on_user_clicked(self, button: Gtk.Button, user: User) -> None:
"""Handle user selection from the user list."""
self._cancel_pending_session()
self._switch_to_user(user)
def _on_login_activate(self, entry: Gtk.PasswordEntry) -> None:
"""Handle Enter key in the password field — attempt login."""
if not self._selected_user:
return
password = entry.get_text()
session = self._get_selected_session()
if not session:
self._show_error(self._strings.no_session_selected)
return
self._attempt_login(self._selected_user, password, session)
def _validate_greetd_sock(self, sock_path: str) -> bool:
"""Validate that GREETD_SOCK points to an absolute path and a real socket."""
path = Path(sock_path)
if not path.is_absolute():
self._show_error(self._strings.greetd_sock_not_absolute)
return False
try:
mode = path.stat().st_mode
if not stat.S_ISSOCK(mode):
self._show_error(self._strings.greetd_sock_not_socket)
return False
except OSError:
self._show_error(self._strings.greetd_sock_unreachable)
return False
return True
def _close_greetd_sock(self) -> None:
"""Close the greetd socket and reset the reference."""
with self._greetd_sock_lock:
if self._greetd_sock:
try:
self._greetd_sock.close()
except OSError:
pass
self._greetd_sock = None
def _set_login_sensitive(self, sensitive: bool) -> None:
"""Enable or disable login controls during authentication."""
self._password_entry.set_sensitive(sensitive)
self._session_dropdown.set_sensitive(sensitive)
def _attempt_login(self, user: User, password: str, session: Session) -> None:
"""Attempt to authenticate and start a session via greetd IPC."""
sock_path = os.environ.get("GREETD_SOCK")
if not sock_path:
self._show_error(self._strings.greetd_sock_not_set)
return
if not self._validate_greetd_sock(sock_path):
return
# Disable UI while authenticating — the IPC runs in a background thread
self._login_cancelled.clear()
self._set_login_sensitive(False)
thread = threading.Thread(
target=self._login_worker,
args=(user, password, session, sock_path),
daemon=True,
)
thread.start()
def _login_worker(self, user: User, password: str, session: Session, sock_path: str) -> None:
"""Run greetd IPC in a background thread to avoid blocking the GTK main loop."""
try:
if self._login_cancelled.is_set():
return
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(10.0)
sock.connect(sock_path)
with self._greetd_sock_lock:
self._greetd_sock = sock
# Step 1: Create session — if a stale session exists, cancel it and retry
response = create_session(sock, user.username)
if self._login_cancelled.is_set():
return
if response.get("type") == "error":
cancel_session(sock)
response = create_session(sock, user.username)
if self._login_cancelled.is_set():
return
if response.get("type") == "error":
GLib.idle_add(self._on_login_error, response, self._strings.auth_failed)
return
# Step 2: Send password if auth message received
if response.get("type") == "auth_message":
response = post_auth_response(sock, password)
if self._login_cancelled.is_set():
return
if response.get("type") == "error":
self._failed_attempts[user.username] = self._failed_attempts.get(user.username, 0) + 1
warning = faillock_warning(self._failed_attempts[user.username], self._strings)
cancel_session(sock)
GLib.idle_add(self._on_login_auth_error, response, warning)
return
if response.get("type") == "auth_message":
# Multi-stage auth (e.g. TOTP) is not supported
cancel_session(sock)
GLib.idle_add(self._on_login_error, None, self._strings.multi_stage_unsupported)
return
# Step 3: Start session
if response.get("type") == "success":
cmd = shlex.split(session.exec_cmd)
if not cmd or not shutil.which(cmd[0]):
cancel_session(sock)
GLib.idle_add(self._on_login_error, None, self._strings.invalid_session_command)
return
response = start_session(sock, cmd)
if self._login_cancelled.is_set():
return
if response.get("type") == "success":
self._save_last_user(user.username)
self._save_last_session(user.username, session.name)
GLib.idle_add(self.get_application().quit)
return
else:
GLib.idle_add(self._on_login_error, response, self._strings.session_start_failed)
return
except (ConnectionError, OSError, ValueError) as e:
if self._login_cancelled.is_set():
# Socket was closed by _cancel_pending_session — exit silently
return
logger.error("greetd IPC error: %s", e)
if isinstance(e, ConnectionError):
GLib.idle_add(self._on_login_error, None, self._strings.connection_error)
else:
GLib.idle_add(self._on_login_error, None, self._strings.socket_error)
finally:
self._close_greetd_sock()
def _on_login_error(self, response: dict | None, message: str) -> None:
"""Handle login error on the GTK main thread."""
if response:
self._show_greetd_error(response, message)
else:
self._show_error(message)
self._set_login_sensitive(True)
def _on_login_auth_error(self, response: dict, warning: str | None) -> None:
"""Handle authentication failure with optional faillock warning on the GTK main thread."""
self._show_greetd_error(response, self._strings.wrong_password)
if warning:
current = self._error_label.get_text()
self._error_label.set_text(f"{current}\n{warning}")
self._set_login_sensitive(True)
def _cancel_pending_session(self) -> None:
"""Cancel any in-progress greetd session.
Sets the cancellation event and closes the socket to interrupt
any blocking I/O in the login worker. The worker checks the
event and exits silently instead of showing an error.
"""
self._login_cancelled.set()
self._close_greetd_sock()
def _get_selected_session(self) -> Session | None:
"""Get the currently selected session from the dropdown."""
if not self._sessions:
return None
idx = self._session_dropdown.get_selected()
if idx < len(self._sessions):
return self._sessions[idx]
return None
def _select_last_session(self, user: User) -> None:
"""Pre-select the last used session for a user in the dropdown."""
if not self._sessions:
return
last_session_name = self._load_last_session(user.username)
if not last_session_name:
return
for i, session in enumerate(self._sessions):
if session.name == last_session_name:
self._session_dropdown.set_selected(i)
return
MAX_GREETD_ERROR_LENGTH = 200
def _show_greetd_error(self, response: dict, fallback: str) -> None:
"""Display an error from greetd, using a fallback for missing or oversized descriptions."""
description = response.get("description", "")
if description and len(description) <= self.MAX_GREETD_ERROR_LENGTH:
self._show_error(description)
else:
self._show_error(fallback)
def _show_error(self, message: str) -> None:
"""Display an error message below the password field."""
self._error_label.set_text(message)
self._error_label.set_visible(True)
self._password_entry.set_text("")
self._password_entry.grab_focus()
def _on_reboot_clicked(self, button: Gtk.Button) -> None:
"""Handle reboot button click."""
button.set_sensitive(False)
threading.Thread(
target=self._power_worker, args=(reboot, self._strings.reboot_failed),
daemon=True,
).start()
def _on_shutdown_clicked(self, button: Gtk.Button) -> None:
"""Handle shutdown button click."""
button.set_sensitive(False)
threading.Thread(
target=self._power_worker, args=(shutdown, self._strings.shutdown_failed),
daemon=True,
).start()
def _power_worker(self, action, error_msg: str) -> None:
"""Run a power action in a background thread to avoid blocking the GTK main loop."""
try:
action()
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
GLib.idle_add(self._show_error, error_msg)
@staticmethod
def _load_last_user() -> str | None:
"""Load the last logged-in username from cache."""
if LAST_USER_PATH.exists():
try:
username = LAST_USER_PATH.read_text().strip()
except OSError:
return None
if len(username) > MAX_USERNAME_LENGTH or not VALID_USERNAME.match(username):
return None
return username
return None
@staticmethod
def _save_last_user(username: str) -> None:
"""Save the last logged-in username to cache."""
try:
LAST_USER_PATH.parent.mkdir(parents=True, exist_ok=True)
LAST_USER_PATH.write_text(username)
except OSError:
pass # Non-critical — cache dir may not be writable
MAX_SESSION_NAME_LENGTH = 256
@staticmethod
def _save_last_session(username: str, session_name: str) -> None:
"""Save the last used session name for a user to cache."""
if not VALID_USERNAME.match(username) or len(username) > MAX_USERNAME_LENGTH:
return
if not session_name or len(session_name) > GreeterWindow.MAX_SESSION_NAME_LENGTH:
return
try:
LAST_SESSION_DIR.mkdir(parents=True, exist_ok=True)
(LAST_SESSION_DIR / username).write_text(session_name)
except OSError:
pass # Non-critical — cache dir may not be writable
@staticmethod
def _load_last_session(username: str) -> str | None:
"""Load the last used session name for a user from cache."""
session_file = LAST_SESSION_DIR / username
if not session_file.exists():
return None
try:
name = session_file.read_text().strip()
except OSError:
return None
if not name or len(name) > GreeterWindow.MAX_SESSION_NAME_LENGTH:
return None
return name