greetd-moongreet/tests/test_ipc.py
nevaforget af01e0a44d fix: Security- und Quality-Hardening aus Audit
- ipc.py: _recvall() Loop für fragmentierte Socket-Reads, 64 KiB
  Payload-Limit gegen OOM
- greeter.py: GREETD_SOCK Validierung (absoluter Pfad + S_ISSOCK),
  Socket-Leak behoben (_close_greetd_sock), shlex.split() statt
  str.split() für Exec-Befehle, greetd-Fehlermeldungen auf 200
  Zeichen begrenzt, get_style_context() durch get_color() ersetzt
  (GTK 4.10+ Deprecation), Socket-Timeout (10s)
- users.py: ValueError-Absicherung bei int(uid_str), Username-
  Sanitierung gegen Pfad-Traversal, Symlink-Check bei Avatar-Pfaden
2026-03-26 11:18:32 +01:00

244 lines
7.6 KiB
Python

# ABOUTME: Tests for greetd IPC protocol — socket communication with length-prefixed JSON.
# ABOUTME: Uses mock sockets to verify message encoding/decoding and greetd request types.
import json
import struct
import socket
from unittest.mock import MagicMock, patch
import pytest
from moongreet.ipc import (
send_message,
recv_message,
create_session,
post_auth_response,
start_session,
cancel_session,
)
class FakeSocket:
"""A fake socket that records sent data and provides canned receive data."""
def __init__(self, recv_data: bytes = b""):
self.sent = bytearray()
self._recv_data = recv_data
self._recv_offset = 0
def sendall(self, data: bytes) -> None:
self.sent.extend(data)
def recv(self, n: int, flags: int = 0) -> bytes:
chunk = self._recv_data[self._recv_offset : self._recv_offset + n]
self._recv_offset += n
return chunk
@classmethod
def with_response(cls, response: dict) -> "FakeSocket":
"""Create a FakeSocket pre-loaded with a length-prefixed JSON response."""
payload = json.dumps(response).encode("utf-8")
data = struct.pack("!I", len(payload)) + payload
return cls(recv_data=data)
class FragmentingSocket:
"""A fake socket that delivers data in small chunks to simulate fragmentation."""
def __init__(self, data: bytes, chunk_size: int = 3):
self.sent = bytearray()
self._data = data
self._offset = 0
self._chunk_size = chunk_size
def sendall(self, data: bytes) -> None:
self.sent.extend(data)
def recv(self, n: int, flags: int = 0) -> bytes:
available = min(n, self._chunk_size, len(self._data) - self._offset)
if available <= 0:
return b""
chunk = self._data[self._offset : self._offset + available]
self._offset += available
return chunk
class TestSendMessage:
"""Tests for encoding and sending length-prefixed JSON messages."""
def test_sends_length_prefixed_json(self) -> None:
sock = FakeSocket()
msg = {"type": "create_session", "username": "testuser"}
send_message(sock, msg)
payload = json.dumps(msg).encode("utf-8")
expected = struct.pack("!I", len(payload)) + payload
assert bytes(sock.sent) == expected
def test_sends_empty_dict(self) -> None:
sock = FakeSocket()
send_message(sock, {})
payload = json.dumps({}).encode("utf-8")
expected = struct.pack("!I", len(payload)) + payload
assert bytes(sock.sent) == expected
def test_sends_nested_message(self) -> None:
sock = FakeSocket()
msg = {"type": "post_auth_message_response", "response": "secret123"}
send_message(sock, msg)
# Verify the payload is correctly length-prefixed
length_bytes = bytes(sock.sent[:4])
length = struct.unpack("!I", length_bytes)[0]
decoded = json.loads(sock.sent[4:])
assert length == len(json.dumps(msg).encode("utf-8"))
assert decoded == msg
class TestRecvMessage:
"""Tests for receiving and decoding length-prefixed JSON messages."""
def test_receives_valid_message(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = recv_message(sock)
assert result == response
def test_receives_complex_message(self) -> None:
response = {
"type": "auth_message",
"auth_message_type": "secret",
"auth_message": "Password:",
}
sock = FakeSocket.with_response(response)
result = recv_message(sock)
assert result == response
def test_raises_on_empty_recv(self) -> None:
sock = FakeSocket(recv_data=b"")
with pytest.raises(ConnectionError):
recv_message(sock)
def test_receives_fragmented_data(self) -> None:
"""recv() may return fewer bytes than requested — must loop."""
response = {"type": "success"}
payload = json.dumps(response).encode("utf-8")
data = struct.pack("!I", len(payload)) + payload
sock = FragmentingSocket(data, chunk_size=3)
result = recv_message(sock)
assert result == response
def test_rejects_oversized_payload(self) -> None:
"""Payloads exceeding the size limit must be rejected."""
header = struct.pack("!I", 10_000_000)
sock = FakeSocket(recv_data=header)
with pytest.raises(ConnectionError, match="too large"):
recv_message(sock)
class TestCreateSession:
"""Tests for the create_session greetd request."""
def test_sends_create_session_request(self) -> None:
response = {
"type": "auth_message",
"auth_message_type": "secret",
"auth_message": "Password:",
}
sock = FakeSocket.with_response(response)
result = create_session(sock, "dominik")
# Verify sent message
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {"type": "create_session", "username": "dominik"}
assert result == response
class TestPostAuthResponse:
"""Tests for posting authentication responses (passwords)."""
def test_sends_password_response(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = post_auth_response(sock, "mypassword")
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {
"type": "post_auth_message_response",
"response": "mypassword",
}
assert result == response
def test_sends_none_response(self) -> None:
"""For auth types that don't require a response."""
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = post_auth_response(sock, None)
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {
"type": "post_auth_message_response",
"response": None,
}
class TestStartSession:
"""Tests for starting a session after authentication."""
def test_sends_start_session_request(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = start_session(sock, ["Hyprland"])
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {"type": "start_session", "cmd": ["Hyprland"]}
assert result == response
def test_sends_multi_arg_command(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = start_session(sock, ["sway", "--config", "/etc/sway/config"])
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {
"type": "start_session",
"cmd": ["sway", "--config", "/etc/sway/config"],
}
class TestCancelSession:
"""Tests for cancelling an in-progress session."""
def test_sends_cancel_session_request(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = cancel_session(sock)
length = struct.unpack("!I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {"type": "cancel_session"}
assert result == response