From af01e0a44d6381f4eef96616907f46472923c34e Mon Sep 17 00:00:00 2001 From: nevaforget Date: Thu, 26 Mar 2026 11:18:32 +0100 Subject: [PATCH] fix: Security- und Quality-Hardening aus Audit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ipc.py: _recvall() Loop für fragmentierte Socket-Reads, 64 KiB Payload-Limit gegen OOM - greeter.py: GREETD_SOCK Validierung (absoluter Pfad + S_ISSOCK), Socket-Leak behoben (_close_greetd_sock), shlex.split() statt str.split() für Exec-Befehle, greetd-Fehlermeldungen auf 200 Zeichen begrenzt, get_style_context() durch get_color() ersetzt (GTK 4.10+ Deprecation), Socket-Timeout (10s) - users.py: ValueError-Absicherung bei int(uid_str), Username- Sanitierung gegen Pfad-Traversal, Symlink-Check bei Avatar-Pfaden --- src/moongreet/greeter.py | 75 ++++++++++++++++++++++++++++++---------- src/moongreet/ipc.py | 25 ++++++++++---- src/moongreet/users.py | 12 +++++-- tests/test_ipc.py | 40 +++++++++++++++++++++ tests/test_users.py | 42 ++++++++++++++++++++++ 5 files changed, 166 insertions(+), 28 deletions(-) diff --git a/src/moongreet/greeter.py b/src/moongreet/greeter.py index 2dc4e9c..d3461c1 100644 --- a/src/moongreet/greeter.py +++ b/src/moongreet/greeter.py @@ -2,7 +2,9 @@ # ABOUTME: Handles user selection, session choice, password entry, and power actions. import os +import shlex import socket +import stat from importlib.resources import files from pathlib import Path @@ -248,14 +250,11 @@ class GreeterWindow(Gtk.ApplicationWindow): def _get_foreground_color(self) -> str: """Get the current GTK theme foreground color as a hex string.""" - color = self.get_style_context().lookup_color("theme_fg_color") - if color[0]: - rgba = color[1] - r = int(rgba.red * 255) - g = int(rgba.green * 255) - b = int(rgba.blue * 255) - return f"#{r:02x}{g:02x}{b:02x}" - return "#ffffff" + rgba = self.get_color() + r = int(rgba.red * 255) + g = int(rgba.green * 255) + b = int(rgba.blue * 255) + return f"#{r:02x}{g:02x}{b:02x}" def _set_default_avatar(self) -> None: """Load the default avatar SVG, tinted with the GTK foreground color.""" @@ -322,6 +321,31 @@ class GreeterWindow(Gtk.ApplicationWindow): self._attempt_login(self._selected_user, password, session) + def _validate_greetd_sock(self, sock_path: str) -> bool: + """Validate that GREETD_SOCK points to an absolute path and a real socket.""" + path = Path(sock_path) + if not path.is_absolute(): + self._show_error("GREETD_SOCK ist kein absoluter Pfad") + return False + try: + mode = path.stat().st_mode + if not stat.S_ISSOCK(mode): + self._show_error("GREETD_SOCK zeigt nicht auf einen Socket") + return False + except OSError: + self._show_error("GREETD_SOCK nicht erreichbar") + return False + return True + + def _close_greetd_sock(self) -> None: + """Close the greetd socket and reset the reference.""" + if self._greetd_sock: + try: + self._greetd_sock.close() + except OSError: + pass + self._greetd_sock = None + def _attempt_login(self, user: User, password: str, session: Session) -> None: """Attempt to authenticate and start a session via greetd IPC.""" sock_path = os.environ.get("GREETD_SOCK") @@ -329,8 +353,12 @@ class GreeterWindow(Gtk.ApplicationWindow): self._show_error("GREETD_SOCK nicht gesetzt") return + if not self._validate_greetd_sock(sock_path): + return + try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(10.0) sock.connect(sock_path) self._greetd_sock = sock @@ -338,8 +366,8 @@ class GreeterWindow(Gtk.ApplicationWindow): response = create_session(sock, user.username) if response.get("type") == "error": - self._show_error(response.get("description", "Authentifizierung fehlgeschlagen")) - sock.close() + self._show_greetd_error(response, "Authentifizierung fehlgeschlagen") + self._close_greetd_sock() return # Step 2: Send password if auth message received @@ -347,28 +375,30 @@ class GreeterWindow(Gtk.ApplicationWindow): response = post_auth_response(sock, password) if response.get("type") == "error": - self._show_error(response.get("description", "Falsches Passwort")) - sock.close() + self._show_greetd_error(response, "Falsches Passwort") + self._close_greetd_sock() return # Step 3: Start session if response.get("type") == "success": - cmd = session.exec_cmd.split() + cmd = shlex.split(session.exec_cmd) response = start_session(sock, cmd) if response.get("type") == "success": self._save_last_user(user.username) - sock.close() + self._close_greetd_sock() self.get_application().quit() return else: - self._show_error(response.get("description", "Session konnte nicht gestartet werden")) + self._show_greetd_error(response, "Session konnte nicht gestartet werden") - sock.close() + self._close_greetd_sock() except ConnectionError as e: + self._close_greetd_sock() self._show_error(f"Verbindungsfehler: {e}") except OSError as e: + self._close_greetd_sock() self._show_error(f"Socket-Fehler: {e}") def _cancel_pending_session(self) -> None: @@ -376,10 +406,9 @@ class GreeterWindow(Gtk.ApplicationWindow): if self._greetd_sock: try: cancel_session(self._greetd_sock) - self._greetd_sock.close() except (ConnectionError, OSError): pass - self._greetd_sock = None + self._close_greetd_sock() def _get_selected_session(self) -> Session | None: """Get the currently selected session from the dropdown.""" @@ -390,6 +419,16 @@ class GreeterWindow(Gtk.ApplicationWindow): return self._sessions[idx] return None + MAX_GREETD_ERROR_LENGTH = 200 + + def _show_greetd_error(self, response: dict, fallback: str) -> None: + """Display an error from greetd, using a fallback for missing or oversized descriptions.""" + description = response.get("description", "") + if description and len(description) <= self.MAX_GREETD_ERROR_LENGTH: + self._show_error(description) + else: + self._show_error(fallback) + def _show_error(self, message: str) -> None: """Display an error message below the password field.""" self._error_label.set_text(message) diff --git a/src/moongreet/ipc.py b/src/moongreet/ipc.py index ce4e951..562b977 100644 --- a/src/moongreet/ipc.py +++ b/src/moongreet/ipc.py @@ -5,6 +5,19 @@ import json import struct from typing import Any +MAX_PAYLOAD_SIZE = 65536 + + +def _recvall(sock: Any, n: int) -> bytes: + """Receive exactly n bytes from socket, looping on partial reads.""" + buf = bytearray() + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + raise ConnectionError("Connection closed while reading data") + buf.extend(chunk) + return bytes(buf) + def send_message(sock: Any, msg: dict) -> None: """Send a length-prefixed JSON message to the greetd socket.""" @@ -15,15 +28,13 @@ def send_message(sock: Any, msg: dict) -> None: def recv_message(sock: Any) -> dict: """Receive a length-prefixed JSON message from the greetd socket.""" - header = sock.recv(4) - if len(header) < 4: - raise ConnectionError("Connection closed while reading message header") - + header = _recvall(sock, 4) length = struct.unpack("!I", header)[0] - payload = sock.recv(length) - if len(payload) < length: - raise ConnectionError("Connection closed while reading message body") + if length > MAX_PAYLOAD_SIZE: + raise ConnectionError(f"Payload too large: {length} bytes (max {MAX_PAYLOAD_SIZE})") + + payload = _recvall(sock, length) return json.loads(payload.decode("utf-8")) diff --git a/src/moongreet/users.py b/src/moongreet/users.py index 46cd681..83c0e35 100644 --- a/src/moongreet/users.py +++ b/src/moongreet/users.py @@ -42,12 +42,18 @@ def get_users(passwd_path: Path = DEFAULT_PASSWD) -> list[User]: continue username, _, uid_str, _, gecos, home, shell = parts[0], parts[1], parts[2], parts[3], parts[4], parts[5], parts[6] - uid = int(uid_str) + + try: + uid = int(uid_str) + except ValueError: + continue if uid < MIN_UID or uid > MAX_UID: continue if shell in NOLOGIN_SHELLS: continue + if "/" in username or username.startswith("."): + continue users.append(User( username=username, @@ -68,13 +74,13 @@ def get_avatar_path( """Find avatar for a user: AccountsService icon → ~/.face → None.""" # AccountsService icon icon = accountsservice_dir / username - if icon.exists(): + if icon.exists() and not icon.is_symlink(): return icon # ~/.face fallback if home_dir is not None: face = home_dir / ".face" - if face.exists(): + if face.exists() and not face.is_symlink(): return face return None diff --git a/tests/test_ipc.py b/tests/test_ipc.py index ab1cee1..6bbd608 100644 --- a/tests/test_ipc.py +++ b/tests/test_ipc.py @@ -42,6 +42,27 @@ class FakeSocket: return cls(recv_data=data) +class FragmentingSocket: + """A fake socket that delivers data in small chunks to simulate fragmentation.""" + + def __init__(self, data: bytes, chunk_size: int = 3): + self.sent = bytearray() + self._data = data + self._offset = 0 + self._chunk_size = chunk_size + + def sendall(self, data: bytes) -> None: + self.sent.extend(data) + + def recv(self, n: int, flags: int = 0) -> bytes: + available = min(n, self._chunk_size, len(self._data) - self._offset) + if available <= 0: + return b"" + chunk = self._data[self._offset : self._offset + available] + self._offset += available + return chunk + + class TestSendMessage: """Tests for encoding and sending length-prefixed JSON messages.""" @@ -107,6 +128,25 @@ class TestRecvMessage: with pytest.raises(ConnectionError): recv_message(sock) + def test_receives_fragmented_data(self) -> None: + """recv() may return fewer bytes than requested — must loop.""" + response = {"type": "success"} + payload = json.dumps(response).encode("utf-8") + data = struct.pack("!I", len(payload)) + payload + sock = FragmentingSocket(data, chunk_size=3) + + result = recv_message(sock) + + assert result == response + + def test_rejects_oversized_payload(self) -> None: + """Payloads exceeding the size limit must be rejected.""" + header = struct.pack("!I", 10_000_000) + sock = FakeSocket(recv_data=header) + + with pytest.raises(ConnectionError, match="too large"): + recv_message(sock) + class TestCreateSession: """Tests for the create_session greetd request.""" diff --git a/tests/test_users.py b/tests/test_users.py index beb11cc..0968795 100644 --- a/tests/test_users.py +++ b/tests/test_users.py @@ -61,6 +61,32 @@ class TestGetUsers: assert users[0].gecos == "" assert users[0].display_name == "user" + def test_skips_invalid_uid(self, tmp_path: Path) -> None: + """Corrupt /etc/passwd with non-numeric UID should not crash.""" + passwd = tmp_path / "passwd" + passwd.write_text( + "corrupt:x:NOTANUMBER:1000:Corrupt:/home/corrupt:/bin/bash\n" + "valid:x:1000:1000:Valid:/home/valid:/bin/bash\n" + ) + + users = get_users(passwd_path=passwd) + + assert len(users) == 1 + assert users[0].username == "valid" + + def test_skips_username_with_slash(self, tmp_path: Path) -> None: + """Usernames containing path separators should be rejected.""" + passwd = tmp_path / "passwd" + passwd.write_text( + "../evil:x:1000:1000:Evil:/home/evil:/bin/bash\n" + "normal:x:1001:1001:Normal:/home/normal:/bin/bash\n" + ) + + users = get_users(passwd_path=passwd) + + assert len(users) == 1 + assert users[0].username == "normal" + class TestGetAvatarPath: """Tests for avatar file lookup.""" @@ -88,6 +114,22 @@ class TestGetAvatarPath: assert result == face + def test_ignores_symlinked_face(self, tmp_path: Path) -> None: + """~/.face as symlink should be ignored to prevent traversal.""" + home = tmp_path / "home" / "attacker" + home.mkdir(parents=True) + target = tmp_path / "secret.txt" + target.write_text("sensitive data") + face = home / ".face" + face.symlink_to(target) + empty_icons = tmp_path / "no_icons" + + result = get_avatar_path( + "attacker", accountsservice_dir=empty_icons, home_dir=home + ) + + assert result is None + def test_returns_none_when_no_avatar(self, tmp_path: Path) -> None: empty_icons = tmp_path / "no_icons" home = tmp_path / "home" / "nobody"