fix: Security- und Quality-Hardening aus Audit

- ipc.py: _recvall() Loop für fragmentierte Socket-Reads, 64 KiB
  Payload-Limit gegen OOM
- greeter.py: GREETD_SOCK Validierung (absoluter Pfad + S_ISSOCK),
  Socket-Leak behoben (_close_greetd_sock), shlex.split() statt
  str.split() für Exec-Befehle, greetd-Fehlermeldungen auf 200
  Zeichen begrenzt, get_style_context() durch get_color() ersetzt
  (GTK 4.10+ Deprecation), Socket-Timeout (10s)
- users.py: ValueError-Absicherung bei int(uid_str), Username-
  Sanitierung gegen Pfad-Traversal, Symlink-Check bei Avatar-Pfaden
This commit is contained in:
nevaforget 2026-03-26 11:18:32 +01:00
parent 3db69e30bc
commit af01e0a44d
5 changed files with 166 additions and 28 deletions

View File

@ -2,7 +2,9 @@
# ABOUTME: Handles user selection, session choice, password entry, and power actions.
import os
import shlex
import socket
import stat
from importlib.resources import files
from pathlib import Path
@ -248,14 +250,11 @@ class GreeterWindow(Gtk.ApplicationWindow):
def _get_foreground_color(self) -> str:
"""Get the current GTK theme foreground color as a hex string."""
color = self.get_style_context().lookup_color("theme_fg_color")
if color[0]:
rgba = color[1]
r = int(rgba.red * 255)
g = int(rgba.green * 255)
b = int(rgba.blue * 255)
return f"#{r:02x}{g:02x}{b:02x}"
return "#ffffff"
rgba = self.get_color()
r = int(rgba.red * 255)
g = int(rgba.green * 255)
b = int(rgba.blue * 255)
return f"#{r:02x}{g:02x}{b:02x}"
def _set_default_avatar(self) -> None:
"""Load the default avatar SVG, tinted with the GTK foreground color."""
@ -322,6 +321,31 @@ class GreeterWindow(Gtk.ApplicationWindow):
self._attempt_login(self._selected_user, password, session)
def _validate_greetd_sock(self, sock_path: str) -> bool:
"""Validate that GREETD_SOCK points to an absolute path and a real socket."""
path = Path(sock_path)
if not path.is_absolute():
self._show_error("GREETD_SOCK ist kein absoluter Pfad")
return False
try:
mode = path.stat().st_mode
if not stat.S_ISSOCK(mode):
self._show_error("GREETD_SOCK zeigt nicht auf einen Socket")
return False
except OSError:
self._show_error("GREETD_SOCK nicht erreichbar")
return False
return True
def _close_greetd_sock(self) -> None:
"""Close the greetd socket and reset the reference."""
if self._greetd_sock:
try:
self._greetd_sock.close()
except OSError:
pass
self._greetd_sock = None
def _attempt_login(self, user: User, password: str, session: Session) -> None:
"""Attempt to authenticate and start a session via greetd IPC."""
sock_path = os.environ.get("GREETD_SOCK")
@ -329,8 +353,12 @@ class GreeterWindow(Gtk.ApplicationWindow):
self._show_error("GREETD_SOCK nicht gesetzt")
return
if not self._validate_greetd_sock(sock_path):
return
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(10.0)
sock.connect(sock_path)
self._greetd_sock = sock
@ -338,8 +366,8 @@ class GreeterWindow(Gtk.ApplicationWindow):
response = create_session(sock, user.username)
if response.get("type") == "error":
self._show_error(response.get("description", "Authentifizierung fehlgeschlagen"))
sock.close()
self._show_greetd_error(response, "Authentifizierung fehlgeschlagen")
self._close_greetd_sock()
return
# Step 2: Send password if auth message received
@ -347,28 +375,30 @@ class GreeterWindow(Gtk.ApplicationWindow):
response = post_auth_response(sock, password)
if response.get("type") == "error":
self._show_error(response.get("description", "Falsches Passwort"))
sock.close()
self._show_greetd_error(response, "Falsches Passwort")
self._close_greetd_sock()
return
# Step 3: Start session
if response.get("type") == "success":
cmd = session.exec_cmd.split()
cmd = shlex.split(session.exec_cmd)
response = start_session(sock, cmd)
if response.get("type") == "success":
self._save_last_user(user.username)
sock.close()
self._close_greetd_sock()
self.get_application().quit()
return
else:
self._show_error(response.get("description", "Session konnte nicht gestartet werden"))
self._show_greetd_error(response, "Session konnte nicht gestartet werden")
sock.close()
self._close_greetd_sock()
except ConnectionError as e:
self._close_greetd_sock()
self._show_error(f"Verbindungsfehler: {e}")
except OSError as e:
self._close_greetd_sock()
self._show_error(f"Socket-Fehler: {e}")
def _cancel_pending_session(self) -> None:
@ -376,10 +406,9 @@ class GreeterWindow(Gtk.ApplicationWindow):
if self._greetd_sock:
try:
cancel_session(self._greetd_sock)
self._greetd_sock.close()
except (ConnectionError, OSError):
pass
self._greetd_sock = None
self._close_greetd_sock()
def _get_selected_session(self) -> Session | None:
"""Get the currently selected session from the dropdown."""
@ -390,6 +419,16 @@ class GreeterWindow(Gtk.ApplicationWindow):
return self._sessions[idx]
return None
MAX_GREETD_ERROR_LENGTH = 200
def _show_greetd_error(self, response: dict, fallback: str) -> None:
"""Display an error from greetd, using a fallback for missing or oversized descriptions."""
description = response.get("description", "")
if description and len(description) <= self.MAX_GREETD_ERROR_LENGTH:
self._show_error(description)
else:
self._show_error(fallback)
def _show_error(self, message: str) -> None:
"""Display an error message below the password field."""
self._error_label.set_text(message)

View File

@ -5,6 +5,19 @@ import json
import struct
from typing import Any
MAX_PAYLOAD_SIZE = 65536
def _recvall(sock: Any, n: int) -> bytes:
"""Receive exactly n bytes from socket, looping on partial reads."""
buf = bytearray()
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("Connection closed while reading data")
buf.extend(chunk)
return bytes(buf)
def send_message(sock: Any, msg: dict) -> None:
"""Send a length-prefixed JSON message to the greetd socket."""
@ -15,15 +28,13 @@ def send_message(sock: Any, msg: dict) -> None:
def recv_message(sock: Any) -> dict:
"""Receive a length-prefixed JSON message from the greetd socket."""
header = sock.recv(4)
if len(header) < 4:
raise ConnectionError("Connection closed while reading message header")
header = _recvall(sock, 4)
length = struct.unpack("!I", header)[0]
payload = sock.recv(length)
if len(payload) < length:
raise ConnectionError("Connection closed while reading message body")
if length > MAX_PAYLOAD_SIZE:
raise ConnectionError(f"Payload too large: {length} bytes (max {MAX_PAYLOAD_SIZE})")
payload = _recvall(sock, length)
return json.loads(payload.decode("utf-8"))

View File

@ -42,12 +42,18 @@ def get_users(passwd_path: Path = DEFAULT_PASSWD) -> list[User]:
continue
username, _, uid_str, _, gecos, home, shell = parts[0], parts[1], parts[2], parts[3], parts[4], parts[5], parts[6]
uid = int(uid_str)
try:
uid = int(uid_str)
except ValueError:
continue
if uid < MIN_UID or uid > MAX_UID:
continue
if shell in NOLOGIN_SHELLS:
continue
if "/" in username or username.startswith("."):
continue
users.append(User(
username=username,
@ -68,13 +74,13 @@ def get_avatar_path(
"""Find avatar for a user: AccountsService icon → ~/.face → None."""
# AccountsService icon
icon = accountsservice_dir / username
if icon.exists():
if icon.exists() and not icon.is_symlink():
return icon
# ~/.face fallback
if home_dir is not None:
face = home_dir / ".face"
if face.exists():
if face.exists() and not face.is_symlink():
return face
return None

View File

@ -42,6 +42,27 @@ class FakeSocket:
return cls(recv_data=data)
class FragmentingSocket:
"""A fake socket that delivers data in small chunks to simulate fragmentation."""
def __init__(self, data: bytes, chunk_size: int = 3):
self.sent = bytearray()
self._data = data
self._offset = 0
self._chunk_size = chunk_size
def sendall(self, data: bytes) -> None:
self.sent.extend(data)
def recv(self, n: int, flags: int = 0) -> bytes:
available = min(n, self._chunk_size, len(self._data) - self._offset)
if available <= 0:
return b""
chunk = self._data[self._offset : self._offset + available]
self._offset += available
return chunk
class TestSendMessage:
"""Tests for encoding and sending length-prefixed JSON messages."""
@ -107,6 +128,25 @@ class TestRecvMessage:
with pytest.raises(ConnectionError):
recv_message(sock)
def test_receives_fragmented_data(self) -> None:
"""recv() may return fewer bytes than requested — must loop."""
response = {"type": "success"}
payload = json.dumps(response).encode("utf-8")
data = struct.pack("!I", len(payload)) + payload
sock = FragmentingSocket(data, chunk_size=3)
result = recv_message(sock)
assert result == response
def test_rejects_oversized_payload(self) -> None:
"""Payloads exceeding the size limit must be rejected."""
header = struct.pack("!I", 10_000_000)
sock = FakeSocket(recv_data=header)
with pytest.raises(ConnectionError, match="too large"):
recv_message(sock)
class TestCreateSession:
"""Tests for the create_session greetd request."""

View File

@ -61,6 +61,32 @@ class TestGetUsers:
assert users[0].gecos == ""
assert users[0].display_name == "user"
def test_skips_invalid_uid(self, tmp_path: Path) -> None:
"""Corrupt /etc/passwd with non-numeric UID should not crash."""
passwd = tmp_path / "passwd"
passwd.write_text(
"corrupt:x:NOTANUMBER:1000:Corrupt:/home/corrupt:/bin/bash\n"
"valid:x:1000:1000:Valid:/home/valid:/bin/bash\n"
)
users = get_users(passwd_path=passwd)
assert len(users) == 1
assert users[0].username == "valid"
def test_skips_username_with_slash(self, tmp_path: Path) -> None:
"""Usernames containing path separators should be rejected."""
passwd = tmp_path / "passwd"
passwd.write_text(
"../evil:x:1000:1000:Evil:/home/evil:/bin/bash\n"
"normal:x:1001:1001:Normal:/home/normal:/bin/bash\n"
)
users = get_users(passwd_path=passwd)
assert len(users) == 1
assert users[0].username == "normal"
class TestGetAvatarPath:
"""Tests for avatar file lookup."""
@ -88,6 +114,22 @@ class TestGetAvatarPath:
assert result == face
def test_ignores_symlinked_face(self, tmp_path: Path) -> None:
"""~/.face as symlink should be ignored to prevent traversal."""
home = tmp_path / "home" / "attacker"
home.mkdir(parents=True)
target = tmp_path / "secret.txt"
target.write_text("sensitive data")
face = home / ".face"
face.symlink_to(target)
empty_icons = tmp_path / "no_icons"
result = get_avatar_path(
"attacker", accountsservice_dir=empty_icons, home_dir=home
)
assert result is None
def test_returns_none_when_no_avatar(self, tmp_path: Path) -> None:
empty_icons = tmp_path / "no_icons"
home = tmp_path / "home" / "nobody"