greetd-moongreet/tests/test_integration.py
nevaforget 3dfa596f9a fix: greetd-Session nach Auth-Fehler sauber canceln
Nach fehlgeschlagenem Login (falsches Passwort) wurde die greetd-Session
nicht gecancelt — beim nächsten Versuch kam "a session is already being
configured". Jetzt wird cancel_session gesendet nach Auth-Fehler, und
bei create_session-Fehler wird einmal cancel + retry versucht.

Außerdem: GTK-Theme-Name und PKGBUILD-pkgver aktualisiert.
2026-03-26 15:26:12 +01:00

364 lines
14 KiB
Python

# ABOUTME: Integration tests — verifies the login flow end-to-end via a mock greetd socket.
# ABOUTME: Tests the IPC sequence: create_session → post_auth → start_session.
import json
import os
import socket
import struct
import threading
from pathlib import Path
import pytest
from moongreet.greeter import faillock_warning, FAILLOCK_MAX_ATTEMPTS, LAST_SESSION_DIR
from moongreet.i18n import load_strings
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
class MockGreetd:
"""A mock greetd server that listens on a Unix socket and responds to IPC messages."""
def __init__(self, sock_path: Path) -> None:
self.sock_path = sock_path
self._responses: list[dict] = []
self._received: list[dict] = []
self._server: socket.socket | None = None
def expect(self, response: dict) -> None:
"""Queue a response to send for the next received message."""
self._responses.append(response)
@property
def received(self) -> list[dict]:
return self._received
def start(self) -> None:
"""Start the mock server in a background thread."""
self._server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._server.bind(str(self.sock_path))
self._server.listen(1)
self._thread = threading.Thread(target=self._serve, daemon=True)
self._thread.start()
@staticmethod
def _recvall(conn: socket.socket, n: int) -> bytes:
"""Receive exactly n bytes from a socket, handling fragmented reads."""
buf = bytearray()
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
break
buf.extend(chunk)
return bytes(buf)
def _serve(self) -> None:
conn, _ = self._server.accept()
try:
for response in self._responses:
# Receive a message
header = self._recvall(conn, 4)
if len(header) < 4:
break
length = struct.unpack("=I", header)[0]
payload = self._recvall(conn, length)
msg = json.loads(payload.decode("utf-8"))
self._received.append(msg)
# Send response
resp_payload = json.dumps(response).encode("utf-8")
conn.sendall(struct.pack("=I", len(resp_payload)) + resp_payload)
finally:
conn.close()
def stop(self) -> None:
if self._server:
self._server.close()
class TestLoginFlow:
"""Integration tests for the complete login flow via mock greetd."""
def test_successful_login(self, tmp_path: Path) -> None:
"""Simulate a complete successful login: create → auth → start."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
mock.expect({"type": "success"})
mock.expect({"type": "success"})
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
# Step 1: Create session
response = create_session(sock, "dominik")
assert response["type"] == "auth_message"
# Step 2: Send password
response = post_auth_response(sock, "geheim")
assert response["type"] == "success"
# Step 3: Start session
response = start_session(sock, ["Hyprland"])
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
# Verify what the mock received
assert mock.received[0] == {"type": "create_session", "username": "dominik"}
assert mock.received[1] == {"type": "post_auth_message_response", "response": "geheim"}
assert mock.received[2] == {"type": "start_session", "cmd": ["Hyprland"]}
def test_wrong_password_sends_cancel(self, tmp_path: Path) -> None:
"""After a failed login, cancel_session must be sent to free the greetd session."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
mock.expect({"type": "error", "error_type": "auth_error", "description": "Authentication failed"})
mock.expect({"type": "success"}) # Response to cancel_session
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
response = create_session(sock, "dominik")
assert response["type"] == "auth_message"
response = post_auth_response(sock, "falsch")
assert response["type"] == "error"
assert response["description"] == "Authentication failed"
# The greeter must cancel the session after auth failure
response = cancel_session(sock)
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
assert mock.received[2] == {"type": "cancel_session"}
def test_stale_session_cancel_and_retry(self, tmp_path: Path) -> None:
"""When create_session fails due to a stale session, cancel and retry."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
# First create_session → error (stale session)
mock.expect({"type": "error", "error_type": "error", "description": "a session is already being configured"})
# cancel_session → success
mock.expect({"type": "success"})
# Second create_session → auth_message (retry succeeds)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
# post_auth_response → success
mock.expect({"type": "success"})
# start_session → success
mock.expect({"type": "success"})
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
# Step 1: Create session fails
response = create_session(sock, "dominik")
assert response["type"] == "error"
# Step 2: Cancel stale session
response = cancel_session(sock)
assert response["type"] == "success"
# Step 3: Retry create session
response = create_session(sock, "dominik")
assert response["type"] == "auth_message"
# Step 4: Send password
response = post_auth_response(sock, "geheim")
assert response["type"] == "success"
# Step 5: Start session
response = start_session(sock, ["niri-session"])
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
assert mock.received[0] == {"type": "create_session", "username": "dominik"}
assert mock.received[1] == {"type": "cancel_session"}
assert mock.received[2] == {"type": "create_session", "username": "dominik"}
def test_multi_stage_auth_sends_cancel(self, tmp_path: Path) -> None:
"""When greetd sends a second auth_message after password, cancel the session."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "TOTP:"})
mock.expect({"type": "success"}) # Response to cancel_session
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
# Step 1: Create session
response = create_session(sock, "dominik")
assert response["type"] == "auth_message"
# Step 2: Send password — greetd responds with another auth_message
response = post_auth_response(sock, "geheim")
assert response["type"] == "auth_message"
# Step 3: Cancel because multi-stage auth is not supported
response = cancel_session(sock)
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
# Verify cancel was sent
assert mock.received[2] == {"type": "cancel_session"}
def test_cancel_session(self, tmp_path: Path) -> None:
"""Simulate cancelling a session after create."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
mock.expect({"type": "success"})
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
create_session(sock, "dominik")
response = cancel_session(sock)
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
assert mock.received[1] == {"type": "cancel_session"}
class TestFaillockWarning:
"""Tests for the faillock warning message logic."""
def test_no_warning_on_zero_attempts(self) -> None:
strings = load_strings("de")
assert faillock_warning(0, strings) is None
def test_no_warning_on_first_attempt(self) -> None:
strings = load_strings("de")
assert faillock_warning(1, strings) is None
def test_warning_on_second_attempt(self) -> None:
strings = load_strings("de")
warning = faillock_warning(2, strings)
assert warning is not None
assert "1" in warning # 1 Versuch übrig
def test_warning_on_third_attempt(self) -> None:
strings = load_strings("de")
warning = faillock_warning(3, strings)
assert warning is not None
assert warning == strings.faillock_locked
def test_warning_beyond_max_attempts(self) -> None:
strings = load_strings("de")
warning = faillock_warning(4, strings)
assert warning is not None
assert warning == strings.faillock_locked
def test_max_attempts_constant_is_three(self) -> None:
assert FAILLOCK_MAX_ATTEMPTS == 3
class TestLastUser:
"""Tests for saving and loading the last logged-in user."""
def test_save_and_load_last_user(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "cache" / "moongreet" / "last-user"
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
GreeterWindow._save_last_user("dominik")
assert cache_path.exists()
assert cache_path.read_text() == "dominik"
result = GreeterWindow._load_last_user()
assert result == "dominik"
def test_load_last_user_missing_file(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "nonexistent" / "last-user"
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user()
assert result is None
def test_load_last_user_rejects_oversized_username(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "last-user"
cache_path.write_text("a" * 300)
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user()
assert result is None
def test_load_last_user_rejects_invalid_characters(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "last-user"
cache_path.write_text("../../etc/passwd")
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user()
assert result is None
class TestLastSession:
"""Tests for saving and loading the last session per user."""
def test_save_and_load_last_session(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
from moongreet.greeter import GreeterWindow
GreeterWindow._save_last_session("dominik", "Niri")
session_file = tmp_path / "dominik"
assert session_file.exists()
assert session_file.read_text() == "Niri"
result = GreeterWindow._load_last_session("dominik")
assert result == "Niri"
def test_load_last_session_missing_file(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_session("nobody")
assert result is None
def test_load_last_session_rejects_oversized_name(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
(tmp_path / "dominik").write_text("A" * 300)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_session("dominik")
assert result is None
def test_save_last_session_validates_username(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""Usernames with path traversal should not create files outside the cache dir."""
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
from moongreet.greeter import GreeterWindow
GreeterWindow._save_last_session("../../etc/evil", "Niri")
# Should not have created any file
assert not (tmp_path / "../../etc/evil").exists()