Harden greeter against threading issues, path traversal, and edge cases
Security: - Fix path traversal in _save/_load_last_session by rejecting usernames starting with dot (blocks '..' and hidden file creation) - Add avatar file size limit (10 MB) to prevent DoS via large ~/.face - Add session_name length validation on write (symmetric with read) - Add payload size check to send_message (symmetric with recv_message) - Set log level to INFO in production (was DEBUG) Quality: - Eliminate main-thread blocking on user switch: _cancel_pending_session now sets a cancellation event and closes the socket instead of doing blocking IPC. The login worker checks the event after each step. - Move power actions (reboot/shutdown) to background threads - Catch TimeoutExpired in addition to CalledProcessError for power actions - Consolidate socket cleanup in _login_worker via finally block, remove redundant _close_greetd_sock calls from error callbacks - Fix _select_initial_user to return False for GLib.idle_add deregistration - Fix context manager leak in resolve_wallpaper_path on exception - Pass Config object to GreeterWindow instead of loading it twice
This commit is contained in:
parent
cab1997dff
commit
64f08d7e8b
@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "moongreet"
|
name = "moongreet"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
description = "A greetd greeter for Wayland with GTK4"
|
description = "A greetd greeter for Wayland with GTK4"
|
||||||
requires-python = ">=3.11"
|
requires-python = ">=3.11"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|||||||
@ -76,5 +76,9 @@ def resolve_wallpaper_path(
|
|||||||
return config.background, None
|
return config.background, None
|
||||||
|
|
||||||
ctx = as_file(_DEFAULT_WALLPAPER_PATH)
|
ctx = as_file(_DEFAULT_WALLPAPER_PATH)
|
||||||
|
try:
|
||||||
path = ctx.__enter__()
|
path = ctx.__enter__()
|
||||||
|
except Exception:
|
||||||
|
ctx.__exit__(None, None, None)
|
||||||
|
raise
|
||||||
return path, ctx
|
return path, ctx
|
||||||
|
|||||||
@ -18,7 +18,7 @@ gi.require_version("Gtk", "4.0")
|
|||||||
gi.require_version("Gdk", "4.0")
|
gi.require_version("Gdk", "4.0")
|
||||||
from gi.repository import Gtk, Gdk, GLib, GdkPixbuf
|
from gi.repository import Gtk, Gdk, GLib, GdkPixbuf
|
||||||
|
|
||||||
from moongreet.config import load_config, resolve_wallpaper_path
|
from moongreet.config import Config, load_config, resolve_wallpaper_path
|
||||||
from moongreet.i18n import load_strings, Strings
|
from moongreet.i18n import load_strings, Strings
|
||||||
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
|
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
|
||||||
from moongreet.users import User, get_users, get_avatar_path
|
from moongreet.users import User, get_users, get_avatar_path
|
||||||
@ -30,11 +30,12 @@ logger = logging.getLogger(__name__)
|
|||||||
LAST_USER_PATH = Path("/var/cache/moongreet/last-user")
|
LAST_USER_PATH = Path("/var/cache/moongreet/last-user")
|
||||||
LAST_SESSION_DIR = Path("/var/cache/moongreet/last-session")
|
LAST_SESSION_DIR = Path("/var/cache/moongreet/last-session")
|
||||||
FAILLOCK_MAX_ATTEMPTS = 3
|
FAILLOCK_MAX_ATTEMPTS = 3
|
||||||
VALID_USERNAME = re.compile(r"^[a-zA-Z0-9_.-]+$")
|
VALID_USERNAME = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_.-]*$")
|
||||||
MAX_USERNAME_LENGTH = 256
|
MAX_USERNAME_LENGTH = 256
|
||||||
PACKAGE_DATA = files("moongreet") / "data"
|
PACKAGE_DATA = files("moongreet") / "data"
|
||||||
DEFAULT_AVATAR_PATH = PACKAGE_DATA / "default-avatar.svg"
|
DEFAULT_AVATAR_PATH = PACKAGE_DATA / "default-avatar.svg"
|
||||||
AVATAR_SIZE = 128
|
AVATAR_SIZE = 128
|
||||||
|
MAX_AVATAR_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
|
||||||
|
|
||||||
|
|
||||||
def faillock_warning(attempt_count: int, strings: Strings | None = None) -> str | None:
|
def faillock_warning(attempt_count: int, strings: Strings | None = None) -> str | None:
|
||||||
@ -77,18 +78,19 @@ class WallpaperWindow(Gtk.ApplicationWindow):
|
|||||||
class GreeterWindow(Gtk.ApplicationWindow):
|
class GreeterWindow(Gtk.ApplicationWindow):
|
||||||
"""The main greeter window with login UI."""
|
"""The main greeter window with login UI."""
|
||||||
|
|
||||||
def __init__(self, bg_path: Path | None = None, **kwargs) -> None:
|
def __init__(self, bg_path: Path | None = None, config: Config | None = None, **kwargs) -> None:
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.add_css_class("greeter")
|
self.add_css_class("greeter")
|
||||||
self.set_default_size(1920, 1080)
|
self.set_default_size(1920, 1080)
|
||||||
|
|
||||||
self._config = load_config()
|
self._config = config if config is not None else load_config()
|
||||||
self._strings = load_strings()
|
self._strings = load_strings()
|
||||||
self._users = get_users()
|
self._users = get_users()
|
||||||
self._sessions = get_sessions()
|
self._sessions = get_sessions()
|
||||||
self._selected_user: User | None = None
|
self._selected_user: User | None = None
|
||||||
self._greetd_sock: socket.socket | None = None
|
self._greetd_sock: socket.socket | None = None
|
||||||
self._greetd_sock_lock = threading.Lock()
|
self._greetd_sock_lock = threading.Lock()
|
||||||
|
self._login_cancelled = threading.Event()
|
||||||
self._default_avatar_pixbuf: GdkPixbuf.Pixbuf | None = None
|
self._default_avatar_pixbuf: GdkPixbuf.Pixbuf | None = None
|
||||||
self._avatar_cache: dict[str, GdkPixbuf.Pixbuf] = {}
|
self._avatar_cache: dict[str, GdkPixbuf.Pixbuf] = {}
|
||||||
self._failed_attempts: dict[str, int] = {}
|
self._failed_attempts: dict[str, int] = {}
|
||||||
@ -247,10 +249,13 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
|
|
||||||
return bar
|
return bar
|
||||||
|
|
||||||
def _select_initial_user(self) -> None:
|
def _select_initial_user(self) -> bool:
|
||||||
"""Select the last user or the first available user."""
|
"""Select the last user or the first available user.
|
||||||
|
|
||||||
|
Returns False to deregister from GLib.idle_add after a single invocation.
|
||||||
|
"""
|
||||||
if not self._users:
|
if not self._users:
|
||||||
return
|
return False
|
||||||
|
|
||||||
# Try to load last user
|
# Try to load last user
|
||||||
last_username = self._load_last_user()
|
last_username = self._load_last_user()
|
||||||
@ -266,6 +271,7 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
target_user = self._users[0]
|
target_user = self._users[0]
|
||||||
|
|
||||||
self._switch_to_user(target_user)
|
self._switch_to_user(target_user)
|
||||||
|
return False
|
||||||
|
|
||||||
def _switch_to_user(self, user: User) -> None:
|
def _switch_to_user(self, user: User) -> None:
|
||||||
"""Update the UI to show the selected user."""
|
"""Update the UI to show the selected user."""
|
||||||
@ -338,6 +344,9 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
def _set_avatar_from_file(self, path: Path, username: str | None = None) -> None:
|
def _set_avatar_from_file(self, path: Path, username: str | None = None) -> None:
|
||||||
"""Load an image file and set it as the avatar, scaled to AVATAR_SIZE."""
|
"""Load an image file and set it as the avatar, scaled to AVATAR_SIZE."""
|
||||||
try:
|
try:
|
||||||
|
if path.stat().st_size > MAX_AVATAR_FILE_SIZE:
|
||||||
|
self._avatar_image.set_from_icon_name("avatar-default-symbolic")
|
||||||
|
return
|
||||||
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
|
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
|
||||||
str(path), AVATAR_SIZE, AVATAR_SIZE, True
|
str(path), AVATAR_SIZE, AVATAR_SIZE, True
|
||||||
)
|
)
|
||||||
@ -427,6 +436,7 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
return
|
return
|
||||||
|
|
||||||
# Disable UI while authenticating — the IPC runs in a background thread
|
# Disable UI while authenticating — the IPC runs in a background thread
|
||||||
|
self._login_cancelled.clear()
|
||||||
self._set_login_sensitive(False)
|
self._set_login_sensitive(False)
|
||||||
thread = threading.Thread(
|
thread = threading.Thread(
|
||||||
target=self._login_worker,
|
target=self._login_worker,
|
||||||
@ -438,6 +448,9 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
def _login_worker(self, user: User, password: str, session: Session, sock_path: str) -> None:
|
def _login_worker(self, user: User, password: str, session: Session, sock_path: str) -> None:
|
||||||
"""Run greetd IPC in a background thread to avoid blocking the GTK main loop."""
|
"""Run greetd IPC in a background thread to avoid blocking the GTK main loop."""
|
||||||
try:
|
try:
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
sock.settimeout(10.0)
|
sock.settimeout(10.0)
|
||||||
sock.connect(sock_path)
|
sock.connect(sock_path)
|
||||||
@ -447,9 +460,14 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
# Step 1: Create session — if a stale session exists, cancel it and retry
|
# Step 1: Create session — if a stale session exists, cancel it and retry
|
||||||
response = create_session(sock, user.username)
|
response = create_session(sock, user.username)
|
||||||
|
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
if response.get("type") == "error":
|
if response.get("type") == "error":
|
||||||
cancel_session(sock)
|
cancel_session(sock)
|
||||||
response = create_session(sock, user.username)
|
response = create_session(sock, user.username)
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
if response.get("type") == "error":
|
if response.get("type") == "error":
|
||||||
GLib.idle_add(self._on_login_error, response, self._strings.auth_failed)
|
GLib.idle_add(self._on_login_error, response, self._strings.auth_failed)
|
||||||
return
|
return
|
||||||
@ -458,6 +476,9 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
if response.get("type") == "auth_message":
|
if response.get("type") == "auth_message":
|
||||||
response = post_auth_response(sock, password)
|
response = post_auth_response(sock, password)
|
||||||
|
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
if response.get("type") == "error":
|
if response.get("type") == "error":
|
||||||
self._failed_attempts[user.username] = self._failed_attempts.get(user.username, 0) + 1
|
self._failed_attempts[user.username] = self._failed_attempts.get(user.username, 0) + 1
|
||||||
warning = faillock_warning(self._failed_attempts[user.username], self._strings)
|
warning = faillock_warning(self._failed_attempts[user.username], self._strings)
|
||||||
@ -480,25 +501,29 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
return
|
return
|
||||||
response = start_session(sock, cmd)
|
response = start_session(sock, cmd)
|
||||||
|
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
if response.get("type") == "success":
|
if response.get("type") == "success":
|
||||||
self._save_last_user(user.username)
|
self._save_last_user(user.username)
|
||||||
self._save_last_session(user.username, session.name)
|
self._save_last_session(user.username, session.name)
|
||||||
self._close_greetd_sock()
|
|
||||||
GLib.idle_add(self.get_application().quit)
|
GLib.idle_add(self.get_application().quit)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
GLib.idle_add(self._on_login_error, response, self._strings.session_start_failed)
|
GLib.idle_add(self._on_login_error, response, self._strings.session_start_failed)
|
||||||
|
return
|
||||||
|
|
||||||
self._close_greetd_sock()
|
except (ConnectionError, OSError, ValueError) as e:
|
||||||
|
if self._login_cancelled.is_set():
|
||||||
except ConnectionError as e:
|
# Socket was closed by _cancel_pending_session — exit silently
|
||||||
logger.error("greetd connection error: %s", e)
|
return
|
||||||
self._close_greetd_sock()
|
logger.error("greetd IPC error: %s", e)
|
||||||
|
if isinstance(e, ConnectionError):
|
||||||
GLib.idle_add(self._on_login_error, None, self._strings.connection_error)
|
GLib.idle_add(self._on_login_error, None, self._strings.connection_error)
|
||||||
except (OSError, ValueError) as e:
|
else:
|
||||||
logger.error("greetd socket error: %s", e)
|
|
||||||
self._close_greetd_sock()
|
|
||||||
GLib.idle_add(self._on_login_error, None, self._strings.socket_error)
|
GLib.idle_add(self._on_login_error, None, self._strings.socket_error)
|
||||||
|
finally:
|
||||||
|
self._close_greetd_sock()
|
||||||
|
|
||||||
def _on_login_error(self, response: dict | None, message: str) -> None:
|
def _on_login_error(self, response: dict | None, message: str) -> None:
|
||||||
"""Handle login error on the GTK main thread."""
|
"""Handle login error on the GTK main thread."""
|
||||||
@ -506,7 +531,6 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
self._show_greetd_error(response, message)
|
self._show_greetd_error(response, message)
|
||||||
else:
|
else:
|
||||||
self._show_error(message)
|
self._show_error(message)
|
||||||
self._close_greetd_sock()
|
|
||||||
self._set_login_sensitive(True)
|
self._set_login_sensitive(True)
|
||||||
|
|
||||||
def _on_login_auth_error(self, response: dict, warning: str | None) -> None:
|
def _on_login_auth_error(self, response: dict, warning: str | None) -> None:
|
||||||
@ -515,17 +539,16 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
if warning:
|
if warning:
|
||||||
current = self._error_label.get_text()
|
current = self._error_label.get_text()
|
||||||
self._error_label.set_text(f"{current}\n{warning}")
|
self._error_label.set_text(f"{current}\n{warning}")
|
||||||
self._close_greetd_sock()
|
|
||||||
self._set_login_sensitive(True)
|
self._set_login_sensitive(True)
|
||||||
|
|
||||||
def _cancel_pending_session(self) -> None:
|
def _cancel_pending_session(self) -> None:
|
||||||
"""Cancel any in-progress greetd session."""
|
"""Cancel any in-progress greetd session.
|
||||||
with self._greetd_sock_lock:
|
|
||||||
if self._greetd_sock:
|
Sets the cancellation event and closes the socket to interrupt
|
||||||
try:
|
any blocking I/O in the login worker. The worker checks the
|
||||||
cancel_session(self._greetd_sock)
|
event and exits silently instead of showing an error.
|
||||||
except (ConnectionError, OSError):
|
"""
|
||||||
pass
|
self._login_cancelled.set()
|
||||||
self._close_greetd_sock()
|
self._close_greetd_sock()
|
||||||
|
|
||||||
def _get_selected_session(self) -> Session | None:
|
def _get_selected_session(self) -> Session | None:
|
||||||
@ -568,17 +591,26 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
|
|
||||||
def _on_reboot_clicked(self, button: Gtk.Button) -> None:
|
def _on_reboot_clicked(self, button: Gtk.Button) -> None:
|
||||||
"""Handle reboot button click."""
|
"""Handle reboot button click."""
|
||||||
try:
|
button.set_sensitive(False)
|
||||||
reboot()
|
threading.Thread(
|
||||||
except subprocess.CalledProcessError:
|
target=self._power_worker, args=(reboot, self._strings.reboot_failed),
|
||||||
self._show_error(self._strings.reboot_failed)
|
daemon=True,
|
||||||
|
).start()
|
||||||
|
|
||||||
def _on_shutdown_clicked(self, button: Gtk.Button) -> None:
|
def _on_shutdown_clicked(self, button: Gtk.Button) -> None:
|
||||||
"""Handle shutdown button click."""
|
"""Handle shutdown button click."""
|
||||||
|
button.set_sensitive(False)
|
||||||
|
threading.Thread(
|
||||||
|
target=self._power_worker, args=(shutdown, self._strings.shutdown_failed),
|
||||||
|
daemon=True,
|
||||||
|
).start()
|
||||||
|
|
||||||
|
def _power_worker(self, action, error_msg: str) -> None:
|
||||||
|
"""Run a power action in a background thread to avoid blocking the GTK main loop."""
|
||||||
try:
|
try:
|
||||||
shutdown()
|
action()
|
||||||
except subprocess.CalledProcessError:
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
|
||||||
self._show_error(self._strings.shutdown_failed)
|
GLib.idle_add(self._show_error, error_msg)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _load_last_user() -> str | None:
|
def _load_last_user() -> str | None:
|
||||||
@ -609,6 +641,8 @@ class GreeterWindow(Gtk.ApplicationWindow):
|
|||||||
"""Save the last used session name for a user to cache."""
|
"""Save the last used session name for a user to cache."""
|
||||||
if not VALID_USERNAME.match(username) or len(username) > MAX_USERNAME_LENGTH:
|
if not VALID_USERNAME.match(username) or len(username) > MAX_USERNAME_LENGTH:
|
||||||
return
|
return
|
||||||
|
if not session_name or len(session_name) > GreeterWindow.MAX_SESSION_NAME_LENGTH:
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
LAST_SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
LAST_SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
(LAST_SESSION_DIR / username).write_text(session_name)
|
(LAST_SESSION_DIR / username).write_text(session_name)
|
||||||
|
|||||||
@ -22,6 +22,8 @@ def _recvall(sock: Any, n: int) -> bytes:
|
|||||||
def send_message(sock: Any, msg: dict) -> None:
|
def send_message(sock: Any, msg: dict) -> None:
|
||||||
"""Send a length-prefixed JSON message to the greetd socket."""
|
"""Send a length-prefixed JSON message to the greetd socket."""
|
||||||
payload = json.dumps(msg).encode("utf-8")
|
payload = json.dumps(msg).encode("utf-8")
|
||||||
|
if len(payload) > MAX_PAYLOAD_SIZE:
|
||||||
|
raise ValueError(f"Payload too large: {len(payload)} bytes (max {MAX_PAYLOAD_SIZE})")
|
||||||
header = struct.pack("=I", len(payload))
|
header = struct.pack("=I", len(payload))
|
||||||
sock.sendall(header + payload)
|
sock.sendall(header + payload)
|
||||||
|
|
||||||
|
|||||||
@ -31,7 +31,7 @@ logger = logging.getLogger(__name__)
|
|||||||
def _setup_logging() -> None:
|
def _setup_logging() -> None:
|
||||||
"""Configure logging to file and stderr."""
|
"""Configure logging to file and stderr."""
|
||||||
root = logging.getLogger()
|
root = logging.getLogger()
|
||||||
root.setLevel(logging.DEBUG)
|
root.setLevel(logging.INFO)
|
||||||
|
|
||||||
formatter = logging.Formatter(
|
formatter = logging.Formatter(
|
||||||
"%(asctime)s %(levelname)s %(name)s: %(message)s"
|
"%(asctime)s %(levelname)s %(name)s: %(message)s"
|
||||||
@ -39,7 +39,7 @@ def _setup_logging() -> None:
|
|||||||
|
|
||||||
# Always log to stderr
|
# Always log to stderr
|
||||||
stderr_handler = logging.StreamHandler(sys.stderr)
|
stderr_handler = logging.StreamHandler(sys.stderr)
|
||||||
stderr_handler.setLevel(logging.DEBUG)
|
stderr_handler.setLevel(logging.INFO)
|
||||||
stderr_handler.setFormatter(formatter)
|
stderr_handler.setFormatter(formatter)
|
||||||
root.addHandler(stderr_handler)
|
root.addHandler(stderr_handler)
|
||||||
|
|
||||||
@ -47,7 +47,7 @@ def _setup_logging() -> None:
|
|||||||
if LOG_DIR.is_dir():
|
if LOG_DIR.is_dir():
|
||||||
try:
|
try:
|
||||||
file_handler = logging.FileHandler(LOG_FILE)
|
file_handler = logging.FileHandler(LOG_FILE)
|
||||||
file_handler.setLevel(logging.DEBUG)
|
file_handler.setLevel(logging.INFO)
|
||||||
file_handler.setFormatter(formatter)
|
file_handler.setFormatter(formatter)
|
||||||
root.addHandler(file_handler)
|
root.addHandler(file_handler)
|
||||||
except PermissionError:
|
except PermissionError:
|
||||||
@ -89,7 +89,7 @@ class MoongreetApp(Gtk.Application):
|
|||||||
primary_monitor = monitors.get_item(0)
|
primary_monitor = monitors.get_item(0)
|
||||||
|
|
||||||
# Main greeter window (login UI) on primary monitor
|
# Main greeter window (login UI) on primary monitor
|
||||||
greeter = GreeterWindow(bg_path=bg_path, application=self)
|
greeter = GreeterWindow(bg_path=bg_path, config=config, application=self)
|
||||||
if HAS_LAYER_SHELL:
|
if HAS_LAYER_SHELL:
|
||||||
self._setup_layer_shell(greeter, keyboard=True)
|
self._setup_layer_shell(greeter, keyboard=True)
|
||||||
if primary_monitor is not None:
|
if primary_monitor is not None:
|
||||||
|
|||||||
@ -245,6 +245,101 @@ class TestLoginFlow:
|
|||||||
assert mock.received[1] == {"type": "cancel_session"}
|
assert mock.received[1] == {"type": "cancel_session"}
|
||||||
|
|
||||||
|
|
||||||
|
class TestSessionCancellation:
|
||||||
|
"""Tests for cancelling an in-progress greetd session during user switch."""
|
||||||
|
|
||||||
|
def test_cancel_closes_socket_and_sets_event(self, tmp_path: Path) -> None:
|
||||||
|
"""_cancel_pending_session should close the socket and set the cancelled event."""
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
|
||||||
|
win = GreeterWindow.__new__(GreeterWindow)
|
||||||
|
win._greetd_sock_lock = threading.Lock()
|
||||||
|
win._login_cancelled = threading.Event()
|
||||||
|
|
||||||
|
# Create a real socket pair to verify close
|
||||||
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
sock_path = tmp_path / "test.sock"
|
||||||
|
server.bind(str(sock_path))
|
||||||
|
server.listen(1)
|
||||||
|
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
client.connect(str(sock_path))
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
win._greetd_sock = client
|
||||||
|
win._cancel_pending_session()
|
||||||
|
|
||||||
|
assert win._login_cancelled.is_set()
|
||||||
|
assert win._greetd_sock is None
|
||||||
|
|
||||||
|
def test_cancel_is_noop_without_socket(self) -> None:
|
||||||
|
"""_cancel_pending_session should be safe to call when no socket exists."""
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
|
||||||
|
win = GreeterWindow.__new__(GreeterWindow)
|
||||||
|
win._greetd_sock_lock = threading.Lock()
|
||||||
|
win._login_cancelled = threading.Event()
|
||||||
|
win._greetd_sock = None
|
||||||
|
|
||||||
|
win._cancel_pending_session()
|
||||||
|
|
||||||
|
assert win._login_cancelled.is_set()
|
||||||
|
assert win._greetd_sock is None
|
||||||
|
|
||||||
|
def test_cancel_does_not_block_main_thread(self, tmp_path: Path) -> None:
|
||||||
|
"""_cancel_pending_session must not do blocking I/O — only close the socket."""
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
|
||||||
|
win = GreeterWindow.__new__(GreeterWindow)
|
||||||
|
win._greetd_sock_lock = threading.Lock()
|
||||||
|
win._login_cancelled = threading.Event()
|
||||||
|
|
||||||
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
win._greetd_sock = sock
|
||||||
|
|
||||||
|
# Should complete nearly instantly (no IPC calls)
|
||||||
|
import time
|
||||||
|
start = time.monotonic()
|
||||||
|
win._cancel_pending_session()
|
||||||
|
elapsed = time.monotonic() - start
|
||||||
|
|
||||||
|
assert elapsed < 0.1 # No blocking I/O
|
||||||
|
|
||||||
|
def test_worker_exits_silently_when_cancelled(self, tmp_path: Path) -> None:
|
||||||
|
"""_login_worker should exit without showing an error when cancelled mid-flight."""
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from moongreet.greeter import GreeterWindow
|
||||||
|
from moongreet.users import User
|
||||||
|
|
||||||
|
win = GreeterWindow.__new__(GreeterWindow)
|
||||||
|
win._greetd_sock_lock = threading.Lock()
|
||||||
|
win._login_cancelled = threading.Event()
|
||||||
|
win._greetd_sock = None
|
||||||
|
win._failed_attempts = {}
|
||||||
|
win._strings = MagicMock()
|
||||||
|
|
||||||
|
# Set cancelled before the worker runs
|
||||||
|
win._login_cancelled.set()
|
||||||
|
|
||||||
|
# Create a socket that will fail (simulating closed socket)
|
||||||
|
sock_path = tmp_path / "greetd.sock"
|
||||||
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
server.bind(str(sock_path))
|
||||||
|
server.listen(1)
|
||||||
|
|
||||||
|
user = User(username="dom", uid=1000, gecos="Dominik", home=Path("/home/dom"), shell="/bin/zsh")
|
||||||
|
|
||||||
|
with patch("moongreet.greeter.GLib.idle_add") as mock_idle:
|
||||||
|
win._login_worker(user, "pw", MagicMock(exec_cmd="niri-session"), str(sock_path))
|
||||||
|
|
||||||
|
# Should NOT have scheduled any error callback
|
||||||
|
for call in mock_idle.call_args_list:
|
||||||
|
func = call[0][0]
|
||||||
|
assert func != win._on_login_error, "Worker should not show error when cancelled"
|
||||||
|
assert func != win._on_login_auth_error, "Worker should not show auth error when cancelled"
|
||||||
|
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
|
||||||
class TestFaillockWarning:
|
class TestFaillockWarning:
|
||||||
"""Tests for the faillock warning message logic."""
|
"""Tests for the faillock warning message logic."""
|
||||||
|
|
||||||
@ -361,3 +456,23 @@ class TestLastSession:
|
|||||||
|
|
||||||
# Should not have created any file
|
# Should not have created any file
|
||||||
assert not (tmp_path / "../../etc/evil").exists()
|
assert not (tmp_path / "../../etc/evil").exists()
|
||||||
|
|
||||||
|
def test_regex_rejects_dot_dot_username(self) -> None:
|
||||||
|
"""Username '..' must not pass VALID_USERNAME validation."""
|
||||||
|
from moongreet.greeter import VALID_USERNAME
|
||||||
|
assert VALID_USERNAME.match("..") is None
|
||||||
|
|
||||||
|
def test_regex_rejects_dot_username(self) -> None:
|
||||||
|
"""Username '.' must not pass VALID_USERNAME validation."""
|
||||||
|
from moongreet.greeter import VALID_USERNAME
|
||||||
|
assert VALID_USERNAME.match(".") is None
|
||||||
|
|
||||||
|
def test_regex_allows_dot_in_middle(self) -> None:
|
||||||
|
"""Usernames like 'first.last' must still be valid."""
|
||||||
|
from moongreet.greeter import VALID_USERNAME
|
||||||
|
assert VALID_USERNAME.match("first.last") is not None
|
||||||
|
|
||||||
|
def test_regex_rejects_leading_dot(self) -> None:
|
||||||
|
"""Usernames starting with '.' are rejected (hidden files)."""
|
||||||
|
from moongreet.greeter import VALID_USERNAME
|
||||||
|
assert VALID_USERNAME.match(".hidden") is None
|
||||||
|
|||||||
@ -99,6 +99,14 @@ class TestSendMessage:
|
|||||||
assert decoded == msg
|
assert decoded == msg
|
||||||
|
|
||||||
|
|
||||||
|
def test_rejects_oversized_payload(self) -> None:
|
||||||
|
sock = FakeSocket()
|
||||||
|
msg = {"type": "huge", "data": "x" * 100000}
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Payload too large"):
|
||||||
|
send_message(sock, msg)
|
||||||
|
|
||||||
|
|
||||||
class TestRecvMessage:
|
class TestRecvMessage:
|
||||||
"""Tests for receiving and decoding length-prefixed JSON messages."""
|
"""Tests for receiving and decoding length-prefixed JSON messages."""
|
||||||
|
|
||||||
|
|||||||
@ -27,6 +27,13 @@ class TestReboot:
|
|||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(subprocess.CalledProcessError):
|
||||||
reboot()
|
reboot()
|
||||||
|
|
||||||
|
@patch("moongreet.power.subprocess.run")
|
||||||
|
def test_raises_on_timeout(self, mock_run) -> None:
|
||||||
|
mock_run.side_effect = subprocess.TimeoutExpired("loginctl", POWER_TIMEOUT)
|
||||||
|
|
||||||
|
with pytest.raises(subprocess.TimeoutExpired):
|
||||||
|
reboot()
|
||||||
|
|
||||||
|
|
||||||
class TestShutdown:
|
class TestShutdown:
|
||||||
"""Tests for the shutdown power action."""
|
"""Tests for the shutdown power action."""
|
||||||
@ -45,3 +52,10 @@ class TestShutdown:
|
|||||||
|
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(subprocess.CalledProcessError):
|
||||||
shutdown()
|
shutdown()
|
||||||
|
|
||||||
|
@patch("moongreet.power.subprocess.run")
|
||||||
|
def test_raises_on_timeout(self, mock_run) -> None:
|
||||||
|
mock_run.side_effect = subprocess.TimeoutExpired("loginctl", POWER_TIMEOUT)
|
||||||
|
|
||||||
|
with pytest.raises(subprocess.TimeoutExpired):
|
||||||
|
shutdown()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user