Rewrite moongreet from Python to Rust (v0.3.0)

Complete rewrite of the greetd greeter from Python/PyGObject to Rust/gtk4-rs
for consistency with moonset, single binary without Python runtime, and
improved security through Rust memory safety.

Modules: main, greeter, ipc, config, i18n, users, sessions, power
86 unit tests covering all modules including login_worker IPC flow.
Security hardening: shell-word splitting for exec_cmd, absolute path
validation for session binaries, session-name sanitization, absolute
loginctl path, atomic IPC writes.
This commit is contained in:
nevaforget 2026-03-27 22:08:33 +01:00
parent de0b1d40ba
commit 226bbb75e4
39 changed files with 4395 additions and 2768 deletions

11
.gitignore vendored
View File

@ -1,13 +1,4 @@
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
dist/
build/
.venv/
.pytest_cache/
.pyright/
*.egg
/target
# makepkg build artifacts
pkg/src/

View File

@ -4,48 +4,52 @@
## Projekt
Moongreet ist ein greetd-Greeter für Wayland, gebaut mit Python + GTK4 + gtk4-layer-shell.
Moongreet ist ein greetd-Greeter für Wayland, gebaut mit Rust + gtk4-rs + gtk4-layer-shell.
Teil des Moonarch-Ökosystems.
## Tech-Stack
- Python 3.11+, PyGObject (GTK 4.0)
- gtk4-layer-shell für Wayland Layer Shell
- Rust (Edition 2024), gtk4-rs 0.11, glib 0.22
- gtk4-layer-shell 0.8 für Wayland Layer Shell (TOP Layer)
- greetd IPC über Unix Domain Socket (length-prefixed JSON)
- pytest für Tests
- `cargo test` für Unit-Tests
## Projektstruktur
- `src/moongreet/` — Quellcode
- `src/moongreet/data/` — Package-Assets (Default-Avatar, Icons) — werden mit dem Wheel ausgeliefert
- `tests/` — pytest Tests
- `data/` — User-Assets (wallpaper.jpg) — nicht Teil des Packages
- `src/` — Rust-Quellcode (main.rs, greeter.rs, ipc.rs, config.rs, users.rs, sessions.rs, i18n.rs, power.rs)
- `resources/` — GResource-Assets (style.css, wallpaper.jpg, default-avatar.svg)
- `config/` — Beispiel-Konfigurationsdateien für `/etc/moongreet/` und `/etc/greetd/`
## Kommandos
```bash
# Tests ausführen
uv run pytest tests/ -v
cargo test
# Typ-Checks
uv run pyright src/
# Release-Build
cargo build --release
# Greeter starten (nur zum Testen, braucht normalerweise greetd)
uv run moongreet
LD_PRELOAD=/usr/lib/libgtk4-layer-shell.so ./target/release/moongreet
```
## Architektur
- `ipc.py` — greetd Socket-Kommunikation (length-prefixed JSON)
- `users.py` — Benutzer aus /etc/passwd, Avatare, GTK-Themes
- `sessions.py` — Wayland/X11 Sessions aus .desktop Files
- `power.py` — Reboot/Shutdown via loginctl
- `i18n.py` — Locale-Erkennung (LANG / /etc/locale.conf) und String-Tabellen (DE/EN)
- `greeter.py` — GTK4 UI (Overlay-Layout), Faillock-Warnung nach 2 Fehlversuchen, WallpaperWindow für Sekundärmonitore
- `main.py` — Entry Point, GTK App, Layer Shell Setup, Multi-Monitor-Orchestrierung
- `ipc.rs` — greetd Socket-Kommunikation (4-byte LE header + JSON)
- `users.rs` — Benutzer aus /etc/passwd, Avatare (AccountsService + ~/.face), Symlink-Rejection
- `sessions.rs` — Wayland/X11 Sessions aus .desktop Files
- `power.rs` — Reboot/Shutdown via loginctl
- `i18n.rs` — Locale-Erkennung (LANG / /etc/locale.conf) und String-Tabellen (DE/EN)
- `config.rs` — TOML-Config ([appearance] background, gtk-theme) + Wallpaper-Fallback
- `greeter.rs` — GTK4 UI (Overlay-Layout), Login-Flow via greetd IPC, Faillock-Warnung, Avatar-Cache, Last-User/Last-Session Persistence
- `main.rs` — Entry Point, GTK App, Layer Shell Setup, Multi-Monitor
- `resources/style.css` — Catppuccin-inspiriertes Theme
## Design Decisions
- **Synchrones I/O im GTK-Konstruktor**: `load_config`, `load_strings`, `get_users` und `get_sessions` laufen synchron in `GreeterWindow.__init__`. Async Loading mit Placeholder-UI wäre möglich, erhöht aber die Komplexität erheblich. Der Greeter startet 1x pro Boot auf lokaler Hardware — die Daten sind klein (passwd, locale.conf, wenige .desktop-Files), die Latenz im Normalfall vernachlässigbar.
- **Synchrones Avatar-Decoding**: `GdkPixbuf.Pixbuf.new_from_file_at_scale` läuft synchron auf dem Main Thread. Bei großen Bildern als `.face`-Datei kann die UI kurz stocken. Der Avatar-Cache (`_avatar_cache`) federt das nach dem ersten Laden ab. Async Decoding per Worker-Thread + `GLib.idle_add` wäre die Alternative, rechtfertigt aber den Aufwand nicht für einen Single-User-Greeter.
- **TOP Layer statt OVERLAY**: Greeter läuft unter greetd, nicht über Waybar
- **GResource-Bundle**: CSS, Wallpaper und Default-Avatar sind in die Binary kompiliert
- **Async Login**: `glib::spawn_future_local` + `gio::spawn_blocking` statt raw Threads
- **Socket-Cancellation**: `Arc<Mutex<Option<UnixStream>>>` + `AtomicBool` für saubere Abbrüche
- **Avatar-Cache**: `HashMap<String, gdk::Texture>` in `Rc<RefCell<GreeterState>>`
- **Symmetrie mit moonset**: Gleiche Patterns (i18n, config, users, power, GResource)

1294
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

25
Cargo.toml Normal file
View File

@ -0,0 +1,25 @@
[package]
name = "moongreet"
version = "0.3.0"
edition = "2024"
description = "A greetd greeter for Wayland with GTK4 and Layer Shell"
license = "MIT"
[dependencies]
gtk4 = { version = "0.11", features = ["v4_10"] }
gtk4-layer-shell = "0.8"
glib = "0.22"
gdk4 = "0.11"
gdk-pixbuf = "0.22"
gio = "0.22"
toml = "0.8"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
log = "0.4"
env_logger = "0.11"
[dev-dependencies]
tempfile = "3"
[build-dependencies]
glib-build-tools = "0.22"

View File

@ -1,51 +1,58 @@
# Moongreet
A greetd greeter for Wayland, built with Python + GTK4 + gtk4-layer-shell.
A greetd greeter for Wayland, built with Rust + GTK4 + gtk4-layer-shell.
Part of the Moonarch ecosystem.
## Features
- **greetd IPC** — Communicates via `$GREETD_SOCK` (length-prefixed JSON)
- **User list** — Parsed from `/etc/passwd` (UID 100065533)
- **Avatars** — AccountsService icons, `~/.face` fallback, default SVG
- **Avatars** — AccountsService icons, `~/.face` fallback, default SVG with theme tinting
- **Sessions** — Discovered from `/usr/share/wayland-sessions/` and `/usr/share/xsessions/`
- **Last user** — Remembered in `/var/cache/moongreet/last-user`
- **Last user/session** — Remembered in `/var/cache/moongreet/`
- **Power actions** — Reboot / Shutdown via `loginctl`
- **Layer Shell** — Fullscreen via gtk4-layer-shell
- **Layer Shell** — Fullscreen via gtk4-layer-shell (TOP layer)
- **Multi-monitor** — Greeter on primary, wallpaper on all monitors
- **i18n** — German and English (auto-detected from system locale)
- **Faillock warning** — Warns after 2 failed attempts, locked message after 3
## Requirements
- Python 3.11+
- GTK 4, PyGObject
- GTK 4
- gtk4-layer-shell (for Wayland fullscreen)
- greetd
## Building
```bash
cargo build --release
```
## Installation
```bash
uv pip install .
```
# Install binary
sudo install -Dm755 target/release/moongreet /usr/bin/moongreet
## System Setup
1. Copy configuration:
```bash
# Install config
sudo mkdir -p /etc/moongreet
sudo cp config/moongreet.toml /etc/moongreet/moongreet.toml
```
2. Edit `/etc/moongreet/moongreet.toml` — set an absolute path for the wallpaper.
## System Setup
3. Create cache directory:
1. Edit `/etc/moongreet/moongreet.toml` — set an absolute path for the wallpaper.
2. Create cache directory:
```bash
sudo mkdir -p /var/cache/moongreet
sudo mkdir -p /var/cache/moongreet/last-session
sudo chown greeter:greeter /var/cache/moongreet
```
4. Configure greetd (`/etc/greetd/config.toml`):
3. Configure greetd (`/etc/greetd/config.toml`):
```ini
[default_session]
command = "moongreet"
command = "niri -c /etc/greetd/niri-greeter.kdl"
user = "greeter"
```
@ -53,13 +60,13 @@ uv pip install .
```bash
# Run tests
uv run pytest tests/ -v
cargo test
# Type checking
uv run pyright src/
# Build release
cargo build --release
# Run locally (without greetd)
uv run moongreet
# Run locally (without greetd, needs LD_PRELOAD for layer-shell)
LD_PRELOAD=/usr/lib/libgtk4-layer-shell.so ./target/release/moongreet
```
## License

10
build.rs Normal file
View File

@ -0,0 +1,10 @@
// ABOUTME: Build script for compiling GResource bundle.
// ABOUTME: Bundles style.css, wallpaper.jpg, and default-avatar.svg into the binary.
fn main() {
glib_build_tools::compile_resources(
&["resources"],
"resources/resources.gresource.xml",
"moongreet.gresource",
);
}

View File

@ -4,24 +4,20 @@
# Maintainer: Dominik Kressler
pkgname=moongreet-git
pkgver=0.2.0.r0.g64f08d7
pkgver=0.3.0.r0.g0000000
pkgrel=1
pkgdesc="A greetd greeter for Wayland, built with Python + GTK4 + gtk4-layer-shell"
arch=('any')
pkgdesc="A greetd greeter for Wayland with GTK4 and Layer Shell"
arch=('x86_64')
url="https://gitea.moonarch.de/nevaforget/greetd-moongreet"
license=('MIT')
depends=(
'python'
'python-gobject'
'gtk4'
'gtk4-layer-shell'
'greetd'
)
makedepends=(
'git'
'python-build'
'python-installer'
'python-hatchling'
'cargo'
)
provides=('moongreet')
conflicts=('moongreet')
@ -36,13 +32,12 @@ pkgver() {
build() {
cd "$srcdir/greetd-moongreet"
rm -rf dist/
python -m build --wheel --no-isolation
cargo build --release --locked
}
package() {
cd "$srcdir/greetd-moongreet"
python -m installer --destdir="$pkgdir" dist/*.whl
install -Dm755 target/release/moongreet "$pkgdir/usr/bin/moongreet"
# Greeter config
install -Dm644 config/moongreet.toml "$pkgdir/etc/moongreet/moongreet.toml"

View File

@ -1,30 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "moongreet"
version = "0.2.1"
description = "A greetd greeter for Wayland with GTK4"
requires-python = ">=3.11"
license = "MIT"
dependencies = [
"PyGObject>=3.46",
]
[project.scripts]
moongreet = "moongreet.main:main"
[tool.hatch.build.targets.wheel]
packages = ["src/moongreet"]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["src"]
[tool.pyright]
pythonVersion = "3.11"
pythonPlatform = "Linux"
venvPath = "."
venv = ".venv"
typeCheckingMode = "standard"

View File

Before

Width:  |  Height:  |  Size: 2.6 KiB

After

Width:  |  Height:  |  Size: 2.6 KiB

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<gresources>
<gresource prefix="/dev/moonarch/moongreet">
<file>style.css</file>
<file>wallpaper.jpg</file>
<file>default-avatar.svg</file>
</gresource>
</gresources>

View File

@ -8,6 +8,11 @@ window.greeter {
background-position: center;
}
/* Wallpaper-only window for secondary monitors */
window.wallpaper {
background-color: #1a1a2e;
}
/* Central login area */
.login-box {
padding: 40px;

View File

Before

Width:  |  Height:  |  Size: 366 KiB

After

Width:  |  Height:  |  Size: 366 KiB

219
src/config.rs Normal file
View File

@ -0,0 +1,219 @@
// ABOUTME: Configuration loading for the greeter.
// ABOUTME: Reads moongreet.toml for wallpaper and GTK theme settings with fallback hierarchy.
use serde::Deserialize;
use std::fs;
use std::path::{Path, PathBuf};
const MOONARCH_WALLPAPER: &str = "/usr/share/moonarch/wallpaper.jpg";
const GRESOURCE_PREFIX: &str = "/dev/moonarch/moongreet";
/// Default config search path: system-wide config.
fn default_config_paths() -> Vec<PathBuf> {
vec![PathBuf::from("/etc/moongreet/moongreet.toml")]
}
/// Raw TOML structure for deserialization.
#[derive(Debug, Clone, Default, Deserialize)]
struct TomlConfig {
appearance: Option<Appearance>,
}
#[derive(Debug, Clone, Default, Deserialize)]
struct Appearance {
background: Option<String>,
#[serde(rename = "gtk-theme")]
gtk_theme: Option<String>,
}
/// Greeter configuration.
#[derive(Debug, Clone, Default)]
pub struct Config {
pub background_path: Option<String>,
pub gtk_theme: Option<String>,
}
/// Load config from TOML files. Later paths override earlier ones.
pub fn load_config(config_paths: Option<&[PathBuf]>) -> Config {
let default_paths = default_config_paths();
let paths = config_paths.unwrap_or(&default_paths);
let mut merged = Config::default();
for path in paths {
if let Ok(content) = fs::read_to_string(path) {
if let Ok(parsed) = toml::from_str::<TomlConfig>(&content) {
if let Some(appearance) = parsed.appearance {
if let Some(bg) = appearance.background {
// Resolve relative paths against config file directory
let bg_path = PathBuf::from(&bg);
if bg_path.is_absolute() {
merged.background_path = Some(bg);
} else if let Some(parent) = path.parent() {
merged.background_path =
Some(parent.join(&bg).to_string_lossy().to_string());
}
}
if appearance.gtk_theme.is_some() {
merged.gtk_theme = appearance.gtk_theme;
}
}
}
}
}
merged
}
/// Resolve the wallpaper path using the fallback hierarchy.
///
/// Priority: config background_path > Moonarch system default > gresource fallback.
pub fn resolve_background_path(config: &Config) -> PathBuf {
resolve_background_path_with(config, Path::new(MOONARCH_WALLPAPER))
}
/// Resolve with configurable moonarch wallpaper path (for testing).
pub fn resolve_background_path_with(config: &Config, moonarch_wallpaper: &Path) -> PathBuf {
// User-configured path
if let Some(ref bg) = config.background_path {
let path = PathBuf::from(bg);
if path.is_file() {
return path;
}
}
// Moonarch ecosystem default
if moonarch_wallpaper.is_file() {
return moonarch_wallpaper.to_path_buf();
}
// GResource fallback path (loaded from compiled resources at runtime)
PathBuf::from(format!("{GRESOURCE_PREFIX}/wallpaper.jpg"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config_has_none_fields() {
let config = Config::default();
assert!(config.background_path.is_none());
assert!(config.gtk_theme.is_none());
}
#[test]
fn load_config_returns_default_when_no_files_exist() {
let paths = vec![PathBuf::from("/nonexistent/moongreet.toml")];
let config = load_config(Some(&paths));
assert!(config.background_path.is_none());
assert!(config.gtk_theme.is_none());
}
#[test]
fn load_config_reads_appearance_section() {
let dir = tempfile::tempdir().unwrap();
let conf = dir.path().join("moongreet.toml");
fs::write(
&conf,
"[appearance]\nbackground = \"/custom/wallpaper.jpg\"\ngtk-theme = \"catppuccin\"\n",
)
.unwrap();
let paths = vec![conf];
let config = load_config(Some(&paths));
assert_eq!(
config.background_path.as_deref(),
Some("/custom/wallpaper.jpg")
);
assert_eq!(config.gtk_theme.as_deref(), Some("catppuccin"));
}
#[test]
fn load_config_resolves_relative_background() {
let dir = tempfile::tempdir().unwrap();
let conf = dir.path().join("moongreet.toml");
fs::write(&conf, "[appearance]\nbackground = \"bg.jpg\"\n").unwrap();
let paths = vec![conf];
let config = load_config(Some(&paths));
let expected = dir.path().join("bg.jpg").to_string_lossy().to_string();
assert_eq!(config.background_path.as_deref(), Some(expected.as_str()));
}
#[test]
fn load_config_later_paths_override_earlier() {
let dir = tempfile::tempdir().unwrap();
let conf1 = dir.path().join("first.toml");
let conf2 = dir.path().join("second.toml");
fs::write(
&conf1,
"[appearance]\nbackground = \"/first.jpg\"\ngtk-theme = \"first\"\n",
)
.unwrap();
fs::write(
&conf2,
"[appearance]\nbackground = \"/second.jpg\"\ngtk-theme = \"second\"\n",
)
.unwrap();
let paths = vec![conf1, conf2];
let config = load_config(Some(&paths));
assert_eq!(config.background_path.as_deref(), Some("/second.jpg"));
assert_eq!(config.gtk_theme.as_deref(), Some("second"));
}
#[test]
fn load_config_skips_missing_files() {
let dir = tempfile::tempdir().unwrap();
let conf = dir.path().join("exists.toml");
fs::write(
&conf,
"[appearance]\nbackground = \"/exists.jpg\"\n",
)
.unwrap();
let paths = vec![PathBuf::from("/nonexistent.toml"), conf];
let config = load_config(Some(&paths));
assert_eq!(config.background_path.as_deref(), Some("/exists.jpg"));
}
#[test]
fn resolve_uses_config_path_when_file_exists() {
let dir = tempfile::tempdir().unwrap();
let wallpaper = dir.path().join("custom.jpg");
fs::write(&wallpaper, "fake").unwrap();
let config = Config {
background_path: Some(wallpaper.to_str().unwrap().to_string()),
gtk_theme: None,
};
assert_eq!(
resolve_background_path_with(&config, Path::new("/nonexistent")),
wallpaper
);
}
#[test]
fn resolve_ignores_config_path_when_file_missing() {
let config = Config {
background_path: Some("/nonexistent/wallpaper.jpg".to_string()),
gtk_theme: None,
};
let result = resolve_background_path_with(&config, Path::new("/nonexistent"));
assert!(result.to_str().unwrap().contains("moongreet"));
}
#[test]
fn resolve_uses_moonarch_wallpaper_as_second_fallback() {
let dir = tempfile::tempdir().unwrap();
let moonarch_wp = dir.path().join("wallpaper.jpg");
fs::write(&moonarch_wp, "fake").unwrap();
let config = Config::default();
assert_eq!(
resolve_background_path_with(&config, &moonarch_wp),
moonarch_wp
);
}
#[test]
fn resolve_uses_gresource_fallback_as_last_resort() {
let config = Config::default();
let result = resolve_background_path_with(&config, Path::new("/nonexistent"));
assert!(result.to_str().unwrap().contains("wallpaper.jpg"));
}
}

1415
src/greeter.rs Normal file

File diff suppressed because it is too large Load Diff

333
src/i18n.rs Normal file
View File

@ -0,0 +1,333 @@
// ABOUTME: Locale detection and string lookup for the greeter UI.
// ABOUTME: Reads system locale (LANG or /etc/locale.conf) and provides DE or EN strings.
use std::env;
use std::fs;
use std::path::Path;
const DEFAULT_LOCALE_CONF: &str = "/etc/locale.conf";
/// All user-visible strings for the greeter UI.
#[derive(Debug, Clone)]
pub struct Strings {
// UI labels
pub password_placeholder: &'static str,
pub reboot_tooltip: &'static str,
pub shutdown_tooltip: &'static str,
// Error messages
pub no_session_selected: &'static str,
pub greetd_sock_not_set: &'static str,
pub greetd_sock_not_absolute: &'static str,
pub greetd_sock_not_socket: &'static str,
pub greetd_sock_unreachable: &'static str,
pub auth_failed: &'static str,
pub wrong_password: &'static str,
pub multi_stage_unsupported: &'static str,
pub invalid_session_command: &'static str,
pub session_start_failed: &'static str,
pub reboot_failed: &'static str,
pub shutdown_failed: &'static str,
pub connection_error: &'static str,
pub socket_error: &'static str,
// Templates (use .replace("{n}", &count.to_string()))
pub faillock_attempts_remaining: &'static str,
pub faillock_locked: &'static str,
}
const STRINGS_DE: Strings = Strings {
password_placeholder: "Passwort",
reboot_tooltip: "Neustart",
shutdown_tooltip: "Herunterfahren",
no_session_selected: "Keine Session ausgewählt",
greetd_sock_not_set: "GREETD_SOCK nicht gesetzt",
greetd_sock_not_absolute: "GREETD_SOCK ist kein absoluter Pfad",
greetd_sock_not_socket: "GREETD_SOCK zeigt nicht auf einen Socket",
greetd_sock_unreachable: "GREETD_SOCK nicht erreichbar",
auth_failed: "Authentifizierung fehlgeschlagen",
wrong_password: "Falsches Passwort",
multi_stage_unsupported: "Mehrstufige Authentifizierung wird nicht unterstützt",
invalid_session_command: "Ungültiger Session-Befehl",
session_start_failed: "Session konnte nicht gestartet werden",
reboot_failed: "Neustart fehlgeschlagen",
shutdown_failed: "Herunterfahren fehlgeschlagen",
connection_error: "Verbindungsfehler",
socket_error: "Socket-Fehler",
faillock_attempts_remaining: "Noch {n} Versuch(e) vor Kontosperrung!",
faillock_locked: "Konto ist möglicherweise gesperrt",
};
const STRINGS_EN: Strings = Strings {
password_placeholder: "Password",
reboot_tooltip: "Reboot",
shutdown_tooltip: "Shut down",
no_session_selected: "No session selected",
greetd_sock_not_set: "GREETD_SOCK not set",
greetd_sock_not_absolute: "GREETD_SOCK is not an absolute path",
greetd_sock_not_socket: "GREETD_SOCK does not point to a socket",
greetd_sock_unreachable: "GREETD_SOCK unreachable",
auth_failed: "Authentication failed",
wrong_password: "Wrong password",
multi_stage_unsupported: "Multi-stage authentication is not supported",
invalid_session_command: "Invalid session command",
session_start_failed: "Failed to start session",
reboot_failed: "Reboot failed",
shutdown_failed: "Shutdown failed",
connection_error: "Connection error",
socket_error: "Socket error",
faillock_attempts_remaining: "{n} attempt(s) remaining before lockout!",
faillock_locked: "Account may be locked",
};
/// Extract the language prefix from a LANG value like "de_DE.UTF-8" → "de".
/// Returns "en" for empty, "C", or "POSIX" values.
fn parse_lang_prefix(lang: &str) -> String {
if lang.is_empty() || lang == "C" || lang == "POSIX" {
return "en".to_string();
}
let prefix = lang
.split('_')
.next()
.unwrap_or(lang)
.split('.')
.next()
.unwrap_or(lang)
.to_lowercase();
if prefix.chars().all(|c| c.is_ascii_alphabetic()) && !prefix.is_empty() {
prefix
} else {
"en".to_string()
}
}
/// Read the LANG= value from a locale.conf file.
fn read_lang_from_conf(path: &Path) -> Option<String> {
let content = fs::read_to_string(path).ok()?;
for line in content.lines() {
if let Some(value) = line.strip_prefix("LANG=") {
let value = value.trim();
if !value.is_empty() {
return Some(value.to_string());
}
}
}
None
}
/// Determine the system language from LANG env var or /etc/locale.conf.
pub fn detect_locale() -> String {
let lang = env::var("LANG")
.ok()
.filter(|s| !s.is_empty())
.or_else(|| read_lang_from_conf(Path::new(DEFAULT_LOCALE_CONF)));
match lang {
Some(l) => parse_lang_prefix(&l),
None => "en".to_string(),
}
}
/// Return the string table for the given locale, defaulting to English.
pub fn load_strings(locale: Option<&str>) -> &'static Strings {
let locale = match locale {
Some(l) => l.to_string(),
None => detect_locale(),
};
match locale.as_str() {
"de" => &STRINGS_DE,
_ => &STRINGS_EN,
}
}
/// Format a faillock warning for the given attempt count.
/// Returns None if no warning is needed yet.
pub fn faillock_warning(attempt_count: u32, strings: &Strings) -> Option<String> {
const FAILLOCK_MAX_ATTEMPTS: u32 = 3;
if attempt_count >= FAILLOCK_MAX_ATTEMPTS {
return Some(strings.faillock_locked.to_string());
}
let remaining = FAILLOCK_MAX_ATTEMPTS - attempt_count;
if remaining == 1 {
return Some(
strings
.faillock_attempts_remaining
.replace("{n}", &remaining.to_string()),
);
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
// -- parse_lang_prefix tests --
#[test]
fn parse_german_locale() {
assert_eq!(parse_lang_prefix("de_DE.UTF-8"), "de");
}
#[test]
fn parse_english_locale() {
assert_eq!(parse_lang_prefix("en_US.UTF-8"), "en");
}
#[test]
fn parse_c_falls_back_to_english() {
assert_eq!(parse_lang_prefix("C"), "en");
}
#[test]
fn parse_posix_falls_back_to_english() {
assert_eq!(parse_lang_prefix("POSIX"), "en");
}
#[test]
fn parse_empty_falls_back_to_english() {
assert_eq!(parse_lang_prefix(""), "en");
}
#[test]
fn parse_unsupported_returns_prefix() {
assert_eq!(parse_lang_prefix("fr_FR.UTF-8"), "fr");
}
#[test]
fn parse_bare_language_code() {
assert_eq!(parse_lang_prefix("de"), "de");
}
// -- read_lang_from_conf tests --
#[test]
fn read_conf_extracts_lang() {
let dir = tempfile::tempdir().unwrap();
let conf = dir.path().join("locale.conf");
let mut f = fs::File::create(&conf).unwrap();
writeln!(f, "LANG=de_DE.UTF-8").unwrap();
assert_eq!(read_lang_from_conf(&conf), Some("de_DE.UTF-8".to_string()));
}
#[test]
fn read_conf_returns_none_for_missing_file() {
assert_eq!(
read_lang_from_conf(Path::new("/nonexistent/locale.conf")),
None
);
}
#[test]
fn read_conf_returns_none_for_empty_lang() {
let dir = tempfile::tempdir().unwrap();
let conf = dir.path().join("locale.conf");
let mut f = fs::File::create(&conf).unwrap();
writeln!(f, "LANG=").unwrap();
assert_eq!(read_lang_from_conf(&conf), None);
}
#[test]
fn read_conf_skips_non_lang_lines() {
let dir = tempfile::tempdir().unwrap();
let conf = dir.path().join("locale.conf");
let mut f = fs::File::create(&conf).unwrap();
writeln!(f, "LC_ALL=en_US.UTF-8").unwrap();
writeln!(f, "LANG=de_DE.UTF-8").unwrap();
assert_eq!(read_lang_from_conf(&conf), Some("de_DE.UTF-8".to_string()));
}
// -- load_strings tests --
#[test]
fn load_strings_german() {
let strings = load_strings(Some("de"));
assert_eq!(strings.password_placeholder, "Passwort");
assert_eq!(strings.reboot_tooltip, "Neustart");
}
#[test]
fn load_strings_english() {
let strings = load_strings(Some("en"));
assert_eq!(strings.password_placeholder, "Password");
assert_eq!(strings.reboot_tooltip, "Reboot");
}
#[test]
fn load_strings_unknown_falls_back_to_english() {
let strings = load_strings(Some("fr"));
assert_eq!(strings.password_placeholder, "Password");
}
#[test]
fn all_string_fields_nonempty() {
for locale in &["de", "en"] {
let s = load_strings(Some(locale));
assert!(!s.password_placeholder.is_empty(), "{locale}: password_placeholder");
assert!(!s.reboot_tooltip.is_empty(), "{locale}: reboot_tooltip");
assert!(!s.shutdown_tooltip.is_empty(), "{locale}: shutdown_tooltip");
assert!(!s.no_session_selected.is_empty(), "{locale}: no_session_selected");
assert!(!s.greetd_sock_not_set.is_empty(), "{locale}: greetd_sock_not_set");
assert!(!s.auth_failed.is_empty(), "{locale}: auth_failed");
assert!(!s.wrong_password.is_empty(), "{locale}: wrong_password");
assert!(!s.reboot_failed.is_empty(), "{locale}: reboot_failed");
assert!(!s.shutdown_failed.is_empty(), "{locale}: shutdown_failed");
assert!(!s.faillock_attempts_remaining.is_empty(), "{locale}: faillock_attempts_remaining");
assert!(!s.faillock_locked.is_empty(), "{locale}: faillock_locked");
}
}
// -- faillock_warning tests --
#[test]
fn faillock_no_warning_at_zero_attempts() {
let s = load_strings(Some("en"));
assert!(faillock_warning(0, s).is_none());
}
#[test]
fn faillock_no_warning_at_first_attempt() {
let s = load_strings(Some("en"));
assert!(faillock_warning(1, s).is_none());
}
#[test]
fn faillock_warning_at_second_attempt() {
let s = load_strings(Some("en"));
let warning = faillock_warning(2, s);
assert!(warning.is_some());
assert!(warning.unwrap().contains("1"));
}
#[test]
fn faillock_locked_at_third_attempt() {
let s = load_strings(Some("en"));
let warning = faillock_warning(3, s);
assert!(warning.is_some());
assert_eq!(warning.unwrap(), "Account may be locked");
}
#[test]
fn faillock_locked_beyond_max() {
let s = load_strings(Some("en"));
let warning = faillock_warning(5, s);
assert!(warning.is_some());
assert_eq!(warning.unwrap(), "Account may be locked");
}
#[test]
fn faillock_german_strings() {
let s = load_strings(Some("de"));
let warning = faillock_warning(2, s).unwrap();
assert!(warning.contains("Kontosperrung"));
let locked = faillock_warning(3, s).unwrap();
assert!(locked.contains("gesperrt"));
}
}

294
src/ipc.rs Normal file
View File

@ -0,0 +1,294 @@
// ABOUTME: greetd IPC protocol implementation — communicates via Unix socket.
// ABOUTME: Uses length-prefixed JSON encoding as specified by the greetd IPC protocol.
use std::io::{self, Read, Write};
use std::os::unix::net::UnixStream;
const MAX_PAYLOAD_SIZE: usize = 65536;
/// Errors from greetd IPC communication.
#[derive(Debug)]
pub enum IpcError {
Io(io::Error),
PayloadTooLarge(usize),
Json(serde_json::Error),
ConnectionClosed,
}
impl std::fmt::Display for IpcError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IpcError::Io(e) => write!(f, "IPC I/O error: {e}"),
IpcError::PayloadTooLarge(size) => {
write!(f, "Payload too large: {size} bytes (max {MAX_PAYLOAD_SIZE})")
}
IpcError::Json(e) => write!(f, "IPC JSON error: {e}"),
IpcError::ConnectionClosed => write!(f, "Connection closed while reading data"),
}
}
}
impl std::error::Error for IpcError {}
impl From<io::Error> for IpcError {
fn from(e: io::Error) -> Self {
IpcError::Io(e)
}
}
impl From<serde_json::Error> for IpcError {
fn from(e: serde_json::Error) -> Self {
IpcError::Json(e)
}
}
/// Read exactly 4 bytes (length header) from the stream into a stack array.
fn recv_header(stream: &mut UnixStream) -> Result<[u8; 4], IpcError> {
let mut buf = [0u8; 4];
let mut filled = 0;
while filled < 4 {
let bytes_read = stream.read(&mut buf[filled..])?;
if bytes_read == 0 {
return Err(IpcError::ConnectionClosed);
}
filled += bytes_read;
}
Ok(buf)
}
/// Receive exactly n bytes from the stream, looping on partial reads.
fn recv_payload(stream: &mut UnixStream, n: usize) -> Result<Vec<u8>, IpcError> {
let mut buf = vec![0u8; n];
let mut filled = 0;
while filled < n {
let bytes_read = stream.read(&mut buf[filled..])?;
if bytes_read == 0 {
return Err(IpcError::ConnectionClosed);
}
filled += bytes_read;
}
Ok(buf)
}
/// Send a length-prefixed JSON message to the greetd socket.
/// Header and payload are sent in a single write for atomicity.
pub fn send_message(
stream: &mut UnixStream,
msg: &serde_json::Value,
) -> Result<(), IpcError> {
let payload = serde_json::to_vec(msg)?;
if payload.len() > MAX_PAYLOAD_SIZE {
return Err(IpcError::PayloadTooLarge(payload.len()));
}
let header = (payload.len() as u32).to_le_bytes();
let mut buf = Vec::with_capacity(4 + payload.len());
buf.extend_from_slice(&header);
buf.extend_from_slice(&payload);
stream.write_all(&buf)?;
Ok(())
}
/// Receive a length-prefixed JSON message from the greetd socket.
pub fn recv_message(stream: &mut UnixStream) -> Result<serde_json::Value, IpcError> {
let header = recv_header(stream)?;
let length = u32::from_le_bytes(header) as usize;
if length > MAX_PAYLOAD_SIZE {
return Err(IpcError::PayloadTooLarge(length));
}
let payload = recv_payload(stream, length)?;
let value: serde_json::Value = serde_json::from_slice(&payload)?;
Ok(value)
}
/// Send a create_session request to greetd and return the response.
pub fn create_session(
stream: &mut UnixStream,
username: &str,
) -> Result<serde_json::Value, IpcError> {
let msg = serde_json::json!({
"type": "create_session",
"username": username,
});
send_message(stream, &msg)?;
recv_message(stream)
}
/// Send an authentication response (e.g. password) to greetd.
pub fn post_auth_response(
stream: &mut UnixStream,
response: Option<&str>,
) -> Result<serde_json::Value, IpcError> {
let msg = serde_json::json!({
"type": "post_auth_message_response",
"response": response,
});
send_message(stream, &msg)?;
recv_message(stream)
}
/// Send a start_session request to launch the user's session.
pub fn start_session(
stream: &mut UnixStream,
cmd: &[String],
) -> Result<serde_json::Value, IpcError> {
let msg = serde_json::json!({
"type": "start_session",
"cmd": cmd,
});
send_message(stream, &msg)?;
recv_message(stream)
}
/// Cancel the current authentication session.
pub fn cancel_session(stream: &mut UnixStream) -> Result<serde_json::Value, IpcError> {
let msg = serde_json::json!({"type": "cancel_session"});
send_message(stream, &msg)?;
recv_message(stream)
}
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::net::UnixStream;
/// Create a connected pair of Unix sockets for testing.
fn socket_pair() -> (UnixStream, UnixStream) {
UnixStream::pair().unwrap()
}
#[test]
fn send_and_receive_message() {
let (mut client, mut server) = socket_pair();
let msg = serde_json::json!({"type": "create_session", "username": "test"});
send_message(&mut client, &msg).unwrap();
let received = recv_message(&mut server).unwrap();
assert_eq!(received["type"], "create_session");
assert_eq!(received["username"], "test");
}
#[test]
fn create_session_roundtrip() {
let (mut client, mut server) = socket_pair();
// Simulate greetd response in a thread
let handle = std::thread::spawn(move || {
let msg = recv_message(&mut server).unwrap();
assert_eq!(msg["type"], "create_session");
assert_eq!(msg["username"], "alice");
let response = serde_json::json!({
"type": "auth_message",
"auth_message_type": "visible",
"auth_message": "Password: ",
});
send_message(&mut server, &response).unwrap();
});
let response = create_session(&mut client, "alice").unwrap();
assert_eq!(response["type"], "auth_message");
handle.join().unwrap();
}
#[test]
fn post_auth_response_roundtrip() {
let (mut client, mut server) = socket_pair();
let handle = std::thread::spawn(move || {
let msg = recv_message(&mut server).unwrap();
assert_eq!(msg["type"], "post_auth_message_response");
assert_eq!(msg["response"], "secret123");
let response = serde_json::json!({"type": "success"});
send_message(&mut server, &response).unwrap();
});
let response = post_auth_response(&mut client, Some("secret123")).unwrap();
assert_eq!(response["type"], "success");
handle.join().unwrap();
}
#[test]
fn start_session_roundtrip() {
let (mut client, mut server) = socket_pair();
let handle = std::thread::spawn(move || {
let msg = recv_message(&mut server).unwrap();
assert_eq!(msg["type"], "start_session");
assert_eq!(msg["cmd"], serde_json::json!(["niri-session"]));
let response = serde_json::json!({"type": "success"});
send_message(&mut server, &response).unwrap();
});
let cmd = vec!["niri-session".to_string()];
let response = start_session(&mut client, &cmd).unwrap();
assert_eq!(response["type"], "success");
handle.join().unwrap();
}
#[test]
fn cancel_session_roundtrip() {
let (mut client, mut server) = socket_pair();
let handle = std::thread::spawn(move || {
let msg = recv_message(&mut server).unwrap();
assert_eq!(msg["type"], "cancel_session");
let response = serde_json::json!({"type": "success"});
send_message(&mut server, &response).unwrap();
});
let response = cancel_session(&mut client).unwrap();
assert_eq!(response["type"], "success");
handle.join().unwrap();
}
#[test]
fn connection_closed_returns_error() {
let (mut client, server) = socket_pair();
drop(server);
let result = recv_message(&mut client);
assert!(result.is_err());
}
#[test]
fn oversized_payload_rejected_on_send() {
let (mut client, _server) = socket_pair();
let big_string = "x".repeat(MAX_PAYLOAD_SIZE + 1);
let msg = serde_json::json!({"data": big_string});
let result = send_message(&mut client, &msg);
assert!(result.is_err());
}
#[test]
fn oversized_payload_rejected_on_receive() {
let (mut client, mut server) = socket_pair();
// Manually send a header claiming a huge payload
let fake_length: u32 = (MAX_PAYLOAD_SIZE as u32) + 1;
server.write_all(&fake_length.to_le_bytes()).unwrap();
let result = recv_message(&mut client);
assert!(matches!(result, Err(IpcError::PayloadTooLarge(_))));
}
#[test]
fn ipc_error_display() {
let err = IpcError::ConnectionClosed;
assert_eq!(err.to_string(), "Connection closed while reading data");
let err = IpcError::PayloadTooLarge(99999);
assert!(err.to_string().contains("99999"));
}
}

110
src/main.rs Normal file
View File

@ -0,0 +1,110 @@
// ABOUTME: Entry point for Moongreet — greetd greeter for Wayland.
// ABOUTME: Sets up GTK Application, Layer Shell, CSS, and multi-monitor windows.
mod config;
mod greeter;
mod i18n;
mod ipc;
mod power;
mod sessions;
mod users;
use gdk4 as gdk;
use gtk4::prelude::*;
use gtk4::{self as gtk, gio};
use gtk4_layer_shell::LayerShell;
use std::path::PathBuf;
fn load_css(display: &gdk::Display) {
let css_provider = gtk::CssProvider::new();
css_provider.load_from_resource("/dev/moonarch/moongreet/style.css");
gtk::style_context_add_provider_for_display(
display,
&css_provider,
gtk::STYLE_PROVIDER_PRIORITY_APPLICATION,
);
}
fn setup_layer_shell(window: &gtk::ApplicationWindow, keyboard: bool) {
window.init_layer_shell();
window.set_layer(gtk4_layer_shell::Layer::Top);
window.set_exclusive_zone(-1);
if keyboard {
window.set_keyboard_mode(gtk4_layer_shell::KeyboardMode::Exclusive);
}
// Anchor to all edges for fullscreen
window.set_anchor(gtk4_layer_shell::Edge::Top, true);
window.set_anchor(gtk4_layer_shell::Edge::Bottom, true);
window.set_anchor(gtk4_layer_shell::Edge::Left, true);
window.set_anchor(gtk4_layer_shell::Edge::Right, true);
}
fn activate(app: &gtk::Application) {
let display = match gdk::Display::default() {
Some(d) => d,
None => {
log::error!("No display available — cannot start greeter UI");
return;
}
};
load_css(&display);
// Load config and resolve wallpaper
let config = config::load_config(None);
let bg_path = config::resolve_background_path(&config);
// Main greeter window (login UI) — compositor picks focused monitor
let greeter_window = greeter::create_greeter_window(&bg_path, &config, app);
setup_layer_shell(&greeter_window, true);
greeter_window.present();
// Wallpaper-only windows on all monitors
let monitors = display.monitors();
for i in 0..monitors.n_items() {
if let Some(monitor) = monitors
.item(i)
.and_then(|obj| obj.downcast::<gdk::Monitor>().ok())
{
let wallpaper = greeter::create_wallpaper_window(&bg_path, app);
setup_layer_shell(&wallpaper, false);
wallpaper.set_monitor(Some(&monitor));
wallpaper.present();
}
}
}
fn setup_logging() {
let mut builder = env_logger::Builder::from_default_env();
builder.filter_level(log::LevelFilter::Info);
// Try file logging to /var/cache/moongreet/ — fall back to stderr
let log_dir = PathBuf::from("/var/cache/moongreet");
if log_dir.is_dir() {
let log_file = log_dir.join("moongreet.log");
if let Ok(file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_file)
{
builder.target(env_logger::Target::Pipe(Box::new(file)));
}
}
builder.init();
}
fn main() {
setup_logging();
log::info!("Moongreet starting");
// Register compiled GResources
gio::resources_register_include!("moongreet.gresource").expect("Failed to register resources");
let app = gtk::Application::builder()
.application_id("dev.moonarch.moongreet")
.build();
app.connect_activate(activate);
app.run();
}

View File

@ -1,2 +0,0 @@
# ABOUTME: Moongreet package — a greetd greeter for Wayland with GTK4.
# ABOUTME: Part of the Moonarch ecosystem.

View File

@ -1,84 +0,0 @@
# ABOUTME: Configuration loading from moongreet.toml.
# ABOUTME: Parses appearance and behavior settings with wallpaper path resolution.
import tomllib
from contextlib import AbstractContextManager
from dataclasses import dataclass
from importlib.resources import as_file, files
from pathlib import Path
DEFAULT_CONFIG_PATHS = [
Path("/etc/moongreet/moongreet.toml"),
]
@dataclass
class Config:
"""Greeter configuration loaded from moongreet.toml."""
background: Path | None = None
gtk_theme: str | None = None
def load_config(config_path: Path | None = None) -> Config:
"""Load configuration from a TOML file.
Relative paths in the config are resolved against the config file's directory.
"""
if config_path is None:
for path in DEFAULT_CONFIG_PATHS:
if path.exists():
config_path = path
break
if config_path is None:
return Config()
if not config_path.exists():
return Config()
try:
with open(config_path, "rb") as f:
data = tomllib.load(f)
except (tomllib.TOMLDecodeError, OSError):
return Config()
config = Config()
appearance = data.get("appearance", {})
bg = appearance.get("background")
if bg:
bg_path = Path(bg)
if not bg_path.is_absolute():
bg_path = config_path.parent / bg_path
config.background = bg_path
gtk_theme = appearance.get("gtk-theme")
if gtk_theme:
config.gtk_theme = gtk_theme
return config
_PACKAGE_DATA = files("moongreet") / "data"
_DEFAULT_WALLPAPER_PATH = _PACKAGE_DATA / "wallpaper.jpg"
def resolve_wallpaper_path(
config: Config,
) -> tuple[Path, AbstractContextManager | None]:
"""Resolve the wallpaper path from config or fall back to the package default.
Returns (path, context_manager). The context_manager is non-None when a
package resource was extracted to a temporary file the caller must keep
it alive and call __exit__ when done.
"""
if config.background and config.background.exists():
return config.background, None
ctx = as_file(_DEFAULT_WALLPAPER_PATH)
try:
path = ctx.__enter__()
except Exception:
ctx.__exit__(None, None, None)
raise
return path, ctx

View File

@ -1 +0,0 @@
<svg xmlns="http://www.w3.org/2000/svg" width="128" height="128" viewBox="0 0 512 512"><path fill="#222222" d="M256 23c-16.076 0-32.375 3.73-48.178 10.24c-2.126 6.525-3.877 14.76-4.877 23.754c-1.31 11.79-1.73 24.706-1.87 36.819c33.864-3.704 75.986-3.704 109.85 0c-.14-12.113-.56-25.03-1.87-36.82c-1-8.992-2.75-17.228-4.877-23.753C288.375 26.73 272.076 23 256 23m100.564 19.332c9.315 7.054 18.107 14.878 26.282 23.234c1.53-6.65 4.69-12.696 9.03-17.695zm-170.03 1.49c-34.675 20.22-65.047 52.714-82.552 86.334c-33.08 63.536-39.69 156.956-.53 214.8C132.786 388.278 200.276 405 256 405s123.215-16.72 152.547-60.045c39.162-57.843 32.55-151.263-.53-214.8c-17.504-33.62-47.876-66.112-82.55-86.333c.578 3.65 1.057 7.388 1.478 11.184c1.522 13.694 1.912 28.197 2.014 41.267C347.664 99.427 362 104 368 110c32 32 75.537 134.695 16 224c-37.654 56.48-218.346 56.48-256 0c-59.537-89.305-16-192 16-224c6-6 20.335-10.573 39.04-13.727c.103-13.07.493-27.573 2.015-41.267c.42-3.796.9-7.534 1.478-11.184zM64 48c-8.837 0-16 7.163-16 16a16 16 0 0 0 7 13.227V145.5L73 132V77.21A16 16 0 0 0 80 64c0-8.837-7.163-16-16-16m358.81 3.68c-12.81 0-23 10.19-23 23s10.19 23 23 23s23-10.19 23-23s-10.19-23-23-23m25.272 55.205c-6.98 5.497-15.758 8.795-25.27 8.795c-.745 0-1.48-.027-2.214-.067a217 217 0 0 1 2.38 4.37l29.852 22.39zm-238.822 2.5c-17.257.09-37.256 3.757-53.233 16.12c-26.634 20.608-43.034 114.763-33.49 146.763c16.584-61.767 31.993-124.02 107.92-161.274a133.5 133.5 0 0 0-21.197-1.61zm-135.055 44.21L40.15 179.138l-14.48 72.408l38.18 45.814c-10.947-46.523-5.776-98.723 10.355-143.764zm363.59 0c16.13 45.042 21.302 97.242 10.355 143.764l38.18-45.815l-14.48-72.408zM106.645 375.93c-3.583 1.17-7.252 3.406-10.282 6.435c-4.136 4.136-6.68 9.43-7.164 14.104c.21.364.603 1.157 1.73 2.162c2.453 2.188 6.693 5.17 12.127 8.358c10.867 6.38 26.55 13.757 44.205 20.623c21.177 8.237 45.35 15.704 67.738 20.38v-27.61c-39.47-5.12-79.897-18.325-108.355-44.452zm298.71 0C376.897 402.055 336.47 415.26 297 420.38v27.61c22.387-4.676 46.56-12.143 67.738-20.38c17.655-6.865 33.338-14.243 44.205-20.622c5.434-3.19 9.674-6.17 12.127-8.36c1.127-1.004 1.52-1.797 1.73-2.16c-.482-4.675-3.027-9.97-7.163-14.105c-3.03-3.03-6.7-5.264-10.282-6.435zM77.322 410.602L18 450.15V494h37v-18h18v18h366v-18h18v18h37v-43.85l-59.322-39.548c-.537.488-1.08.97-1.623 1.457c-3.922 3.497-8.932 6.89-14.998 10.452c-12.133 7.12-28.45 14.743-46.795 21.877C334.572 458.656 290.25 471 256 471s-78.572-12.343-115.262-26.61c-18.345-7.135-34.662-14.757-46.795-21.878c-6.066-3.56-11.076-6.955-14.998-10.453c-.543-.487-1.086-.97-1.623-1.458zM233 422.184v28.992c8.236 1.162 16.012 1.824 23 1.824s14.764-.662 23-1.824v-28.992a325 325 0 0 1-46 0"></path></svg>

Before

Width:  |  Height:  |  Size: 2.6 KiB

View File

@ -1,664 +0,0 @@
# ABOUTME: Main greeter window — builds the GTK4 UI layout for the Moongreet greeter.
# ABOUTME: Handles user selection, session choice, password entry, and power actions.
import logging
import os
import re
import shlex
import shutil
import socket
import stat
import subprocess
import threading
from importlib.resources import files
from pathlib import Path
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Gdk", "4.0")
from gi.repository import Gtk, Gdk, GLib, GdkPixbuf
from moongreet.config import Config, load_config, resolve_wallpaper_path
from moongreet.i18n import load_strings, Strings
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
from moongreet.users import User, get_users, get_avatar_path
from moongreet.sessions import Session, get_sessions
from moongreet.power import reboot, shutdown
logger = logging.getLogger(__name__)
LAST_USER_PATH = Path("/var/cache/moongreet/last-user")
LAST_SESSION_DIR = Path("/var/cache/moongreet/last-session")
FAILLOCK_MAX_ATTEMPTS = 3
VALID_USERNAME = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_.-]*$")
MAX_USERNAME_LENGTH = 256
PACKAGE_DATA = files("moongreet") / "data"
DEFAULT_AVATAR_PATH = PACKAGE_DATA / "default-avatar.svg"
AVATAR_SIZE = 128
MAX_AVATAR_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
def faillock_warning(attempt_count: int, strings: Strings | None = None) -> str | None:
"""Return a warning if the user is approaching or has reached the faillock limit."""
if strings is None:
strings = load_strings()
remaining = FAILLOCK_MAX_ATTEMPTS - attempt_count
if remaining <= 0:
return strings.faillock_locked
if remaining == 1:
return strings.faillock_attempts_remaining.format(n=remaining)
return None
def _build_wallpaper_widget(bg_path: Path | None) -> Gtk.Widget:
"""Create a wallpaper widget that fills the available space."""
if bg_path and bg_path.exists():
background = Gtk.Picture()
background.set_filename(str(bg_path))
background.set_content_fit(Gtk.ContentFit.COVER)
background.set_hexpand(True)
background.set_vexpand(True)
return background
background = Gtk.Box()
background.set_hexpand(True)
background.set_vexpand(True)
return background
class WallpaperWindow(Gtk.ApplicationWindow):
"""A window that shows only the wallpaper — used for secondary monitors."""
def __init__(self, bg_path: Path | None = None, **kwargs) -> None:
super().__init__(**kwargs)
self.add_css_class("greeter")
self.set_default_size(1920, 1080)
self.set_child(_build_wallpaper_widget(bg_path))
class GreeterWindow(Gtk.ApplicationWindow):
"""The main greeter window with login UI."""
def __init__(self, bg_path: Path | None = None, config: Config | None = None, **kwargs) -> None:
super().__init__(**kwargs)
self.add_css_class("greeter")
self.set_default_size(1920, 1080)
self._config = config if config is not None else load_config()
self._strings = load_strings()
self._users = get_users()
self._sessions = get_sessions()
self._selected_user: User | None = None
self._greetd_sock: socket.socket | None = None
self._greetd_sock_lock = threading.Lock()
self._login_cancelled = threading.Event()
self._default_avatar_pixbuf: GdkPixbuf.Pixbuf | None = None
self._avatar_cache: dict[str, GdkPixbuf.Pixbuf] = {}
self._failed_attempts: dict[str, int] = {}
self._bg_path = bg_path
self._apply_global_theme()
self._build_ui()
self._setup_keyboard_navigation()
# Defer initial user selection until the window is realized,
# so get_color() returns the actual theme foreground for SVG tinting
self.connect("realize", self._on_realize)
def _on_realize(self, widget: Gtk.Widget) -> None:
"""Called when the window is realized — select initial user.
Deferred from __init__ so get_color() returns actual theme values
for SVG tinting. Uses idle_add so the first frame renders before
avatar loading blocks the main loop.
"""
GLib.idle_add(self._select_initial_user)
def _build_ui(self) -> None:
"""Build the complete greeter UI layout."""
# Root overlay for layering
overlay = Gtk.Overlay()
self.set_child(overlay)
# Background wallpaper
overlay.set_child(_build_wallpaper_widget(self._bg_path))
# Main layout: 3 rows (top spacer, center login, bottom bar)
main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
main_box.set_hexpand(True)
main_box.set_vexpand(True)
overlay.add_overlay(main_box)
# Top spacer
top_spacer = Gtk.Box()
top_spacer.set_vexpand(True)
main_box.append(top_spacer)
# Center: login box
center_box = self._build_login_box()
center_box.set_halign(Gtk.Align.CENTER)
main_box.append(center_box)
# Bottom spacer
bottom_spacer = Gtk.Box()
bottom_spacer.set_vexpand(True)
main_box.append(bottom_spacer)
# Bottom bar overlay (user list left, power buttons right)
bottom_bar = self._build_bottom_bar()
bottom_bar.set_valign(Gtk.Align.END)
overlay.add_overlay(bottom_bar)
def _build_login_box(self) -> Gtk.Box:
"""Build the central login area with avatar, name, session, password."""
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
box.add_css_class("login-box")
box.set_halign(Gtk.Align.CENTER)
box.set_valign(Gtk.Align.CENTER)
box.set_spacing(12)
# Avatar — wrapped in a clipping frame for round shape
avatar_frame = Gtk.Box()
avatar_frame.set_size_request(AVATAR_SIZE, AVATAR_SIZE)
avatar_frame.set_halign(Gtk.Align.CENTER)
avatar_frame.set_overflow(Gtk.Overflow.HIDDEN)
avatar_frame.add_css_class("avatar")
self._avatar_image = Gtk.Image()
self._avatar_image.set_pixel_size(AVATAR_SIZE)
avatar_frame.append(self._avatar_image)
box.append(avatar_frame)
# Username label
self._username_label = Gtk.Label(label="")
self._username_label.add_css_class("username-label")
box.append(self._username_label)
# Session dropdown
self._session_dropdown = Gtk.DropDown()
self._session_dropdown.add_css_class("session-dropdown")
self._session_dropdown.set_hexpand(True)
if self._sessions:
session_names = [s.name for s in self._sessions]
string_list = Gtk.StringList.new(session_names)
self._session_dropdown.set_model(string_list)
box.append(self._session_dropdown)
# Password entry
self._password_entry = Gtk.PasswordEntry()
self._password_entry.set_hexpand(True)
self._password_entry.set_property("placeholder-text", self._strings.password_placeholder)
self._password_entry.set_property("show-peek-icon", True)
self._password_entry.add_css_class("password-entry")
self._password_entry.connect("activate", self._on_login_activate)
box.append(self._password_entry)
# Error label (hidden by default)
self._error_label = Gtk.Label(label="")
self._error_label.add_css_class("error-label")
self._error_label.set_visible(False)
box.append(self._error_label)
return box
def _build_bottom_bar(self) -> Gtk.Box:
"""Build the bottom bar with user list (left) and power buttons (right)."""
bar = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
bar.set_hexpand(True)
bar.set_margin_start(16)
bar.set_margin_end(16)
bar.set_margin_bottom(16)
# User list (left)
user_list_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
user_list_box.add_css_class("user-list")
user_list_box.set_halign(Gtk.Align.START)
user_list_box.set_valign(Gtk.Align.END)
for user in self._users:
btn = Gtk.Button(label=user.display_name)
btn.add_css_class("user-list-item")
btn.connect("clicked", self._on_user_clicked, user)
user_list_box.append(btn)
bar.append(user_list_box)
# Spacer
spacer = Gtk.Box()
spacer.set_hexpand(True)
bar.append(spacer)
# Power buttons (right)
power_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
power_box.set_halign(Gtk.Align.END)
power_box.set_valign(Gtk.Align.END)
power_box.set_spacing(8)
reboot_btn = Gtk.Button()
reboot_btn.set_icon_name("system-reboot-symbolic")
reboot_btn.add_css_class("power-button")
reboot_btn.set_tooltip_text(self._strings.reboot_tooltip)
reboot_btn.connect("clicked", self._on_reboot_clicked)
power_box.append(reboot_btn)
shutdown_btn = Gtk.Button()
shutdown_btn.set_icon_name("system-shutdown-symbolic")
shutdown_btn.add_css_class("power-button")
shutdown_btn.set_tooltip_text(self._strings.shutdown_tooltip)
shutdown_btn.connect("clicked", self._on_shutdown_clicked)
power_box.append(shutdown_btn)
bar.append(power_box)
return bar
def _select_initial_user(self) -> bool:
"""Select the last user or the first available user.
Returns False to deregister from GLib.idle_add after a single invocation.
"""
if not self._users:
return False
# Try to load last user
last_username = self._load_last_user()
target_user = None
if last_username:
for user in self._users:
if user.username == last_username:
target_user = user
break
if target_user is None:
target_user = self._users[0]
self._switch_to_user(target_user)
return False
def _switch_to_user(self, user: User) -> None:
"""Update the UI to show the selected user."""
self._selected_user = user
self._username_label.set_text(user.display_name)
self._password_entry.set_text("")
self._error_label.set_visible(False)
# Update avatar (use cache if available)
if user.username in self._avatar_cache:
self._avatar_image.set_from_pixbuf(self._avatar_cache[user.username])
else:
avatar_path = get_avatar_path(
user.username, home_dir=user.home
)
if avatar_path and avatar_path.exists():
self._set_avatar_from_file(avatar_path, user.username)
else:
# Default avatar — _set_default_avatar uses Traversable.read_text()
# which works in ZIP wheels too, no exists() check needed
self._set_default_avatar()
# Pre-select last used session for this user
self._select_last_session(user)
# Focus password entry
self._password_entry.grab_focus()
def _apply_global_theme(self) -> None:
"""Apply the GTK theme from moongreet.toml configuration."""
theme_name = self._config.gtk_theme
if not theme_name:
return
settings = Gtk.Settings.get_default()
if settings is None:
return
settings.set_property("gtk-theme-name", theme_name)
def _get_foreground_color(self) -> str:
"""Get the current GTK theme foreground color as a hex string."""
rgba = self.get_color()
r = int(rgba.red * 255)
g = int(rgba.green * 255)
b = int(rgba.blue * 255)
return f"#{r:02x}{g:02x}{b:02x}"
def _set_default_avatar(self) -> None:
"""Load the default avatar SVG, tinted with the GTK foreground color."""
if self._default_avatar_pixbuf:
self._avatar_image.set_from_pixbuf(self._default_avatar_pixbuf)
return
try:
svg_text = DEFAULT_AVATAR_PATH.read_text()
fg_color = self._get_foreground_color()
svg_text = svg_text.replace("#PLACEHOLDER", fg_color)
svg_bytes = svg_text.encode("utf-8")
loader = GdkPixbuf.PixbufLoader.new_with_type("svg")
loader.set_size(AVATAR_SIZE, AVATAR_SIZE)
loader.write(svg_bytes)
loader.close()
pixbuf = loader.get_pixbuf()
if pixbuf:
self._default_avatar_pixbuf = pixbuf
self._avatar_image.set_from_pixbuf(pixbuf)
except (GLib.Error, OSError):
self._avatar_image.set_from_icon_name("avatar-default-symbolic")
def _set_avatar_from_file(self, path: Path, username: str | None = None) -> None:
"""Load an image file and set it as the avatar, scaled to AVATAR_SIZE."""
try:
if path.stat().st_size > MAX_AVATAR_FILE_SIZE:
self._avatar_image.set_from_icon_name("avatar-default-symbolic")
return
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
str(path), AVATAR_SIZE, AVATAR_SIZE, True
)
if username:
self._avatar_cache[username] = pixbuf
self._avatar_image.set_from_pixbuf(pixbuf)
except GLib.Error:
self._avatar_image.set_from_icon_name("avatar-default-symbolic")
def _setup_keyboard_navigation(self) -> None:
"""Set up keyboard shortcuts."""
controller = Gtk.EventControllerKey()
controller.connect("key-pressed", self._on_key_pressed)
self.add_controller(controller)
def _on_key_pressed(
self,
controller: Gtk.EventControllerKey,
keyval: int,
keycode: int,
state: Gdk.ModifierType,
) -> bool:
"""Handle global key presses."""
if keyval == Gdk.KEY_Escape:
self._password_entry.set_text("")
self._error_label.set_visible(False)
return True
return False
def _on_user_clicked(self, button: Gtk.Button, user: User) -> None:
"""Handle user selection from the user list."""
self._cancel_pending_session()
self._switch_to_user(user)
def _on_login_activate(self, entry: Gtk.PasswordEntry) -> None:
"""Handle Enter key in the password field — attempt login."""
if not self._selected_user:
return
password = entry.get_text()
session = self._get_selected_session()
if not session:
self._show_error(self._strings.no_session_selected)
return
self._attempt_login(self._selected_user, password, session)
def _validate_greetd_sock(self, sock_path: str) -> bool:
"""Validate that GREETD_SOCK points to an absolute path and a real socket."""
path = Path(sock_path)
if not path.is_absolute():
self._show_error(self._strings.greetd_sock_not_absolute)
return False
try:
mode = path.stat().st_mode
if not stat.S_ISSOCK(mode):
self._show_error(self._strings.greetd_sock_not_socket)
return False
except OSError:
self._show_error(self._strings.greetd_sock_unreachable)
return False
return True
def _close_greetd_sock(self) -> None:
"""Close the greetd socket and reset the reference."""
with self._greetd_sock_lock:
if self._greetd_sock:
try:
self._greetd_sock.close()
except OSError:
pass
self._greetd_sock = None
def _set_login_sensitive(self, sensitive: bool) -> None:
"""Enable or disable login controls during authentication."""
self._password_entry.set_sensitive(sensitive)
self._session_dropdown.set_sensitive(sensitive)
def _attempt_login(self, user: User, password: str, session: Session) -> None:
"""Attempt to authenticate and start a session via greetd IPC."""
sock_path = os.environ.get("GREETD_SOCK")
if not sock_path:
self._show_error(self._strings.greetd_sock_not_set)
return
if not self._validate_greetd_sock(sock_path):
return
# Disable UI while authenticating — the IPC runs in a background thread
self._login_cancelled.clear()
self._set_login_sensitive(False)
thread = threading.Thread(
target=self._login_worker,
args=(user, password, session, sock_path),
daemon=True,
)
thread.start()
def _login_worker(self, user: User, password: str, session: Session, sock_path: str) -> None:
"""Run greetd IPC in a background thread to avoid blocking the GTK main loop."""
try:
if self._login_cancelled.is_set():
return
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(10.0)
sock.connect(sock_path)
with self._greetd_sock_lock:
self._greetd_sock = sock
# Step 1: Create session — if a stale session exists, cancel it and retry
response = create_session(sock, user.username)
if self._login_cancelled.is_set():
return
if response.get("type") == "error":
cancel_session(sock)
response = create_session(sock, user.username)
if self._login_cancelled.is_set():
return
if response.get("type") == "error":
GLib.idle_add(self._on_login_error, response, self._strings.auth_failed)
return
# Step 2: Send password if auth message received
if response.get("type") == "auth_message":
response = post_auth_response(sock, password)
if self._login_cancelled.is_set():
return
if response.get("type") == "error":
self._failed_attempts[user.username] = self._failed_attempts.get(user.username, 0) + 1
warning = faillock_warning(self._failed_attempts[user.username], self._strings)
cancel_session(sock)
GLib.idle_add(self._on_login_auth_error, response, warning)
return
if response.get("type") == "auth_message":
# Multi-stage auth (e.g. TOTP) is not supported
cancel_session(sock)
GLib.idle_add(self._on_login_error, None, self._strings.multi_stage_unsupported)
return
# Step 3: Start session
if response.get("type") == "success":
cmd = shlex.split(session.exec_cmd)
if not cmd or not shutil.which(cmd[0]):
cancel_session(sock)
GLib.idle_add(self._on_login_error, None, self._strings.invalid_session_command)
return
response = start_session(sock, cmd)
if self._login_cancelled.is_set():
return
if response.get("type") == "success":
self._save_last_user(user.username)
self._save_last_session(user.username, session.name)
GLib.idle_add(self.get_application().quit)
return
else:
GLib.idle_add(self._on_login_error, response, self._strings.session_start_failed)
return
except (ConnectionError, OSError, ValueError) as e:
if self._login_cancelled.is_set():
# Socket was closed by _cancel_pending_session — exit silently
return
logger.error("greetd IPC error: %s", e)
if isinstance(e, ConnectionError):
GLib.idle_add(self._on_login_error, None, self._strings.connection_error)
else:
GLib.idle_add(self._on_login_error, None, self._strings.socket_error)
finally:
self._close_greetd_sock()
def _on_login_error(self, response: dict | None, message: str) -> None:
"""Handle login error on the GTK main thread."""
if response:
self._show_greetd_error(response, message)
else:
self._show_error(message)
self._set_login_sensitive(True)
def _on_login_auth_error(self, response: dict, warning: str | None) -> None:
"""Handle authentication failure with optional faillock warning on the GTK main thread."""
self._show_greetd_error(response, self._strings.wrong_password)
if warning:
current = self._error_label.get_text()
self._error_label.set_text(f"{current}\n{warning}")
self._set_login_sensitive(True)
def _cancel_pending_session(self) -> None:
"""Cancel any in-progress greetd session.
Sets the cancellation event and closes the socket to interrupt
any blocking I/O in the login worker. The worker checks the
event and exits silently instead of showing an error.
"""
self._login_cancelled.set()
self._close_greetd_sock()
def _get_selected_session(self) -> Session | None:
"""Get the currently selected session from the dropdown."""
if not self._sessions:
return None
idx = self._session_dropdown.get_selected()
if idx < len(self._sessions):
return self._sessions[idx]
return None
def _select_last_session(self, user: User) -> None:
"""Pre-select the last used session for a user in the dropdown."""
if not self._sessions:
return
last_session_name = self._load_last_session(user.username)
if not last_session_name:
return
for i, session in enumerate(self._sessions):
if session.name == last_session_name:
self._session_dropdown.set_selected(i)
return
MAX_GREETD_ERROR_LENGTH = 200
def _show_greetd_error(self, response: dict, fallback: str) -> None:
"""Display an error from greetd, using a fallback for missing or oversized descriptions."""
description = response.get("description", "")
if description and len(description) <= self.MAX_GREETD_ERROR_LENGTH:
self._show_error(description)
else:
self._show_error(fallback)
def _show_error(self, message: str) -> None:
"""Display an error message below the password field."""
self._error_label.set_text(message)
self._error_label.set_visible(True)
self._password_entry.set_text("")
self._password_entry.grab_focus()
def _on_reboot_clicked(self, button: Gtk.Button) -> None:
"""Handle reboot button click."""
button.set_sensitive(False)
threading.Thread(
target=self._power_worker, args=(reboot, self._strings.reboot_failed),
daemon=True,
).start()
def _on_shutdown_clicked(self, button: Gtk.Button) -> None:
"""Handle shutdown button click."""
button.set_sensitive(False)
threading.Thread(
target=self._power_worker, args=(shutdown, self._strings.shutdown_failed),
daemon=True,
).start()
def _power_worker(self, action, error_msg: str) -> None:
"""Run a power action in a background thread to avoid blocking the GTK main loop."""
try:
action()
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
GLib.idle_add(self._show_error, error_msg)
@staticmethod
def _load_last_user() -> str | None:
"""Load the last logged-in username from cache."""
if LAST_USER_PATH.exists():
try:
username = LAST_USER_PATH.read_text().strip()
except OSError:
return None
if len(username) > MAX_USERNAME_LENGTH or not VALID_USERNAME.match(username):
return None
return username
return None
@staticmethod
def _save_last_user(username: str) -> None:
"""Save the last logged-in username to cache."""
try:
LAST_USER_PATH.parent.mkdir(parents=True, exist_ok=True)
LAST_USER_PATH.write_text(username)
except OSError:
pass # Non-critical — cache dir may not be writable
MAX_SESSION_NAME_LENGTH = 256
@staticmethod
def _save_last_session(username: str, session_name: str) -> None:
"""Save the last used session name for a user to cache."""
if not VALID_USERNAME.match(username) or len(username) > MAX_USERNAME_LENGTH:
return
if not session_name or len(session_name) > GreeterWindow.MAX_SESSION_NAME_LENGTH:
return
try:
LAST_SESSION_DIR.mkdir(parents=True, exist_ok=True)
(LAST_SESSION_DIR / username).write_text(session_name)
except OSError:
pass # Non-critical — cache dir may not be writable
@staticmethod
def _load_last_session(username: str) -> str | None:
"""Load the last used session name for a user from cache."""
session_file = LAST_SESSION_DIR / username
if not session_file.exists():
return None
try:
name = session_file.read_text().strip()
except OSError:
return None
if not name or len(name) > GreeterWindow.MAX_SESSION_NAME_LENGTH:
return None
return name

View File

@ -1,117 +0,0 @@
# ABOUTME: Locale detection and string lookup for the greeter UI.
# ABOUTME: Reads system locale (LANG or /etc/locale.conf) and provides DE or EN strings.
import os
from dataclasses import dataclass
from pathlib import Path
DEFAULT_LOCALE_CONF = Path("/etc/locale.conf")
@dataclass(frozen=True)
class Strings:
"""All user-visible strings for the greeter UI."""
# UI labels
password_placeholder: str
reboot_tooltip: str
shutdown_tooltip: str
# Error messages
no_session_selected: str
greetd_sock_not_set: str
greetd_sock_not_absolute: str
greetd_sock_not_socket: str
greetd_sock_unreachable: str
auth_failed: str
wrong_password: str
multi_stage_unsupported: str
invalid_session_command: str
session_start_failed: str
reboot_failed: str
shutdown_failed: str
# Error messages (continued)
connection_error: str
socket_error: str
# Templates (use .format())
faillock_attempts_remaining: str
faillock_locked: str
_STRINGS_DE = Strings(
password_placeholder="Passwort",
reboot_tooltip="Neustart",
shutdown_tooltip="Herunterfahren",
no_session_selected="Keine Session ausgewählt",
greetd_sock_not_set="GREETD_SOCK nicht gesetzt",
greetd_sock_not_absolute="GREETD_SOCK ist kein absoluter Pfad",
greetd_sock_not_socket="GREETD_SOCK zeigt nicht auf einen Socket",
greetd_sock_unreachable="GREETD_SOCK nicht erreichbar",
auth_failed="Authentifizierung fehlgeschlagen",
wrong_password="Falsches Passwort",
multi_stage_unsupported="Mehrstufige Authentifizierung wird nicht unterstützt",
invalid_session_command="Ungültiger Session-Befehl",
session_start_failed="Session konnte nicht gestartet werden",
reboot_failed="Neustart fehlgeschlagen",
shutdown_failed="Herunterfahren fehlgeschlagen",
connection_error="Verbindungsfehler",
socket_error="Socket-Fehler",
faillock_attempts_remaining="Noch {n} Versuch(e) vor Kontosperrung!",
faillock_locked="Konto ist möglicherweise gesperrt",
)
_STRINGS_EN = Strings(
password_placeholder="Password",
reboot_tooltip="Reboot",
shutdown_tooltip="Shut down",
no_session_selected="No session selected",
greetd_sock_not_set="GREETD_SOCK not set",
greetd_sock_not_absolute="GREETD_SOCK is not an absolute path",
greetd_sock_not_socket="GREETD_SOCK does not point to a socket",
greetd_sock_unreachable="GREETD_SOCK unreachable",
auth_failed="Authentication failed",
wrong_password="Wrong password",
multi_stage_unsupported="Multi-stage authentication is not supported",
invalid_session_command="Invalid session command",
session_start_failed="Failed to start session",
reboot_failed="Reboot failed",
shutdown_failed="Shutdown failed",
connection_error="Connection error",
socket_error="Socket error",
faillock_attempts_remaining="{n} attempt(s) remaining before lockout!",
faillock_locked="Account may be locked",
)
_LOCALE_MAP: dict[str, Strings] = {
"de": _STRINGS_DE,
"en": _STRINGS_EN,
}
def detect_locale(locale_conf_path: Path = DEFAULT_LOCALE_CONF) -> str:
"""Determine the system language from LANG env var or /etc/locale.conf."""
lang = os.environ.get("LANG")
if not lang and locale_conf_path.exists():
for line in locale_conf_path.read_text().splitlines():
if line.startswith("LANG="):
lang = line.split("=", 1)[1].strip()
break
if not lang or lang in ("C", "POSIX"):
return "en"
# Extract language prefix: "de_DE.UTF-8" → "de"
lang = lang.split("_")[0].split(".")[0].lower()
if not lang.isalpha():
return "en"
return lang
def load_strings(locale: str | None = None) -> Strings:
"""Return the string table for the given locale, defaulting to English."""
if locale is None:
locale = detect_locale()
return _LOCALE_MAP.get(locale, _STRINGS_EN)

View File

@ -1,64 +0,0 @@
# ABOUTME: greetd IPC protocol implementation — communicates via Unix socket.
# ABOUTME: Uses length-prefixed JSON encoding as specified by the greetd IPC protocol.
import json
import struct
from typing import Any
MAX_PAYLOAD_SIZE = 65536
def _recvall(sock: Any, n: int) -> bytes:
"""Receive exactly n bytes from socket, looping on partial reads."""
buf = bytearray()
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("Connection closed while reading data")
buf.extend(chunk)
return bytes(buf)
def send_message(sock: Any, msg: dict) -> None:
"""Send a length-prefixed JSON message to the greetd socket."""
payload = json.dumps(msg).encode("utf-8")
if len(payload) > MAX_PAYLOAD_SIZE:
raise ValueError(f"Payload too large: {len(payload)} bytes (max {MAX_PAYLOAD_SIZE})")
header = struct.pack("=I", len(payload))
sock.sendall(header + payload)
def recv_message(sock: Any) -> dict:
"""Receive a length-prefixed JSON message from the greetd socket."""
header = _recvall(sock, 4)
length = struct.unpack("=I", header)[0]
if length > MAX_PAYLOAD_SIZE:
raise ConnectionError(f"Payload too large: {length} bytes (max {MAX_PAYLOAD_SIZE})")
payload = _recvall(sock, length)
return json.loads(payload.decode("utf-8"))
def create_session(sock: Any, username: str) -> dict:
"""Send a create_session request to greetd and return the response."""
send_message(sock, {"type": "create_session", "username": username})
return recv_message(sock)
def post_auth_response(sock: Any, response: str | None) -> dict:
"""Send an authentication response (e.g. password) to greetd."""
send_message(sock, {"type": "post_auth_message_response", "response": response})
return recv_message(sock)
def start_session(sock: Any, cmd: list[str]) -> dict:
"""Send a start_session request to launch the user's session."""
send_message(sock, {"type": "start_session", "cmd": cmd})
return recv_message(sock)
def cancel_session(sock: Any) -> dict:
"""Cancel the current authentication session."""
send_message(sock, {"type": "cancel_session"})
return recv_message(sock)

View File

@ -1,162 +0,0 @@
# ABOUTME: Entry point for Moongreet — sets up GTK Application and Layer Shell.
# ABOUTME: Handles multi-monitor setup: login UI on primary, wallpaper on secondary monitors.
import logging
import sys
from importlib.resources import files
from pathlib import Path
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Gdk", "4.0")
from gi.repository import Gtk, Gdk
from moongreet.config import load_config, resolve_wallpaper_path
from moongreet.greeter import GreeterWindow, WallpaperWindow
# gtk4-layer-shell is optional for development/testing
try:
gi.require_version("Gtk4LayerShell", "1.0")
from gi.repository import Gtk4LayerShell
HAS_LAYER_SHELL = True
except (ValueError, ImportError):
HAS_LAYER_SHELL = False
LOG_DIR = Path("/var/cache/moongreet")
LOG_FILE = LOG_DIR / "moongreet.log"
logger = logging.getLogger(__name__)
def _setup_logging() -> None:
"""Configure logging to file and stderr."""
root = logging.getLogger()
root.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s %(levelname)s %(name)s: %(message)s"
)
# Always log to stderr
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.INFO)
stderr_handler.setFormatter(formatter)
root.addHandler(stderr_handler)
# Log to file if the directory is writable
if LOG_DIR.is_dir():
try:
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
root.addHandler(file_handler)
except PermissionError:
logger.warning("Cannot write to %s", LOG_FILE)
class MoongreetApp(Gtk.Application):
"""GTK Application for the Moongreet greeter."""
def __init__(self) -> None:
super().__init__(application_id="dev.moonarch.moongreet")
self._wallpaper_ctx = None
self._secondary_windows: list[WallpaperWindow] = []
def do_activate(self) -> None:
"""Create and present greeter windows on all monitors."""
display = Gdk.Display.get_default()
if display is None:
logger.error("No display available — cannot start greeter UI")
return
self._register_icons(display)
self._load_css(display)
# Resolve wallpaper once, share across all windows
config = load_config()
bg_path, self._wallpaper_ctx = resolve_wallpaper_path(config)
monitors = display.get_monitors()
primary_monitor = None
# Find primary monitor — fall back to first available
for i in range(monitors.get_n_items()):
monitor = monitors.get_item(i)
if hasattr(monitor, 'is_primary') and monitor.is_primary():
primary_monitor = monitor
break
if primary_monitor is None and monitors.get_n_items() > 0:
primary_monitor = monitors.get_item(0)
# Main greeter window (login UI) on primary monitor
greeter = GreeterWindow(bg_path=bg_path, config=config, application=self)
if HAS_LAYER_SHELL:
self._setup_layer_shell(greeter, keyboard=True)
if primary_monitor is not None:
Gtk4LayerShell.set_monitor(greeter, primary_monitor)
greeter.present()
# Wallpaper-only windows on secondary monitors
for i in range(monitors.get_n_items()):
monitor = monitors.get_item(i)
if monitor == primary_monitor:
continue
wallpaper_win = WallpaperWindow(bg_path=bg_path, application=self)
if HAS_LAYER_SHELL:
self._setup_layer_shell(wallpaper_win, keyboard=False)
Gtk4LayerShell.set_monitor(wallpaper_win, monitor)
wallpaper_win.present()
self._secondary_windows.append(wallpaper_win)
def do_shutdown(self) -> None:
"""Clean up wallpaper context manager on exit."""
if self._wallpaper_ctx is not None:
self._wallpaper_ctx.__exit__(None, None, None)
self._wallpaper_ctx = None
Gtk.Application.do_shutdown(self)
def _register_icons(self, display: Gdk.Display) -> None:
"""Register custom icons from the package data/icons directory."""
icons_dir = files("moongreet") / "data" / "icons"
icon_theme = Gtk.IconTheme.get_for_display(display)
icon_theme.add_search_path(str(icons_dir))
def _load_css(self, display: Gdk.Display) -> None:
"""Load the CSS stylesheet for the greeter."""
css_provider = Gtk.CssProvider()
css_path = files("moongreet") / "style.css"
css_provider.load_from_path(str(css_path))
Gtk.StyleContext.add_provider_for_display(
display,
css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
)
def _setup_layer_shell(self, window: Gtk.Window, keyboard: bool = False) -> None:
"""Configure gtk4-layer-shell for fullscreen display."""
Gtk4LayerShell.init_for_window(window)
Gtk4LayerShell.set_layer(window, Gtk4LayerShell.Layer.TOP)
if keyboard:
Gtk4LayerShell.set_keyboard_mode(
window, Gtk4LayerShell.KeyboardMode.EXCLUSIVE
)
# Anchor to all edges for fullscreen
for edge in [
Gtk4LayerShell.Edge.TOP,
Gtk4LayerShell.Edge.BOTTOM,
Gtk4LayerShell.Edge.LEFT,
Gtk4LayerShell.Edge.RIGHT,
]:
Gtk4LayerShell.set_anchor(window, edge, True)
def main() -> None:
"""Run the Moongreet application."""
_setup_logging()
logger.info("Moongreet starting")
app = MoongreetApp()
app.run(sys.argv)
if __name__ == "__main__":
main()

View File

@ -1,17 +0,0 @@
# ABOUTME: Power actions — reboot and shutdown via loginctl.
# ABOUTME: Simple wrappers around system commands for the greeter UI.
import subprocess
POWER_TIMEOUT = 30
def reboot() -> None:
"""Reboot the system via loginctl."""
subprocess.run(["loginctl", "reboot"], check=True, timeout=POWER_TIMEOUT)
def shutdown() -> None:
"""Shut down the system via loginctl."""
subprocess.run(["loginctl", "poweroff"], check=True, timeout=POWER_TIMEOUT)

View File

@ -1,63 +0,0 @@
# ABOUTME: Session detection — discovers available Wayland and X11 sessions.
# ABOUTME: Parses .desktop files from standard session directories.
import configparser
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
DEFAULT_WAYLAND_DIRS = (Path("/usr/share/wayland-sessions"),)
DEFAULT_XSESSION_DIRS = (Path("/usr/share/xsessions"),)
@dataclass
class Session:
"""Represents an available login session."""
name: str
exec_cmd: str
session_type: str # "wayland" or "x11"
def _parse_desktop_file(path: Path, session_type: str) -> Session | None:
"""Parse a .desktop file and return a Session, or None if invalid."""
config = configparser.ConfigParser(interpolation=None)
config.read(path)
section = "Desktop Entry"
if not config.has_section(section):
return None
name = config.get(section, "Name", fallback=None)
exec_cmd = config.get(section, "Exec", fallback=None)
if not name or not exec_cmd:
return None
return Session(name=name, exec_cmd=exec_cmd, session_type=session_type)
def get_sessions(
wayland_dirs: Sequence[Path] = DEFAULT_WAYLAND_DIRS,
xsession_dirs: Sequence[Path] = DEFAULT_XSESSION_DIRS,
) -> list[Session]:
"""Discover available sessions from .desktop files."""
sessions: list[Session] = []
for directory in wayland_dirs:
if not directory.exists():
continue
for desktop_file in sorted(directory.glob("*.desktop")):
session = _parse_desktop_file(desktop_file, "wayland")
if session:
sessions.append(session)
for directory in xsession_dirs:
if not directory.exists():
continue
for desktop_file in sorted(directory.glob("*.desktop")):
session = _parse_desktop_file(desktop_file, "x11")
if session:
sessions.append(session)
return sessions

View File

@ -1,109 +0,0 @@
# ABOUTME: User detection — parses /etc/passwd for login users, finds avatars and GTK themes.
# ABOUTME: Provides User dataclass and helper functions for the greeter UI.
import configparser
from dataclasses import dataclass
from pathlib import Path
NOLOGIN_SHELLS = {"/usr/sbin/nologin", "/sbin/nologin", "/bin/false", "/usr/bin/nologin"}
MIN_UID = 1000
MAX_UID = 65533
DEFAULT_PASSWD = Path("/etc/passwd")
DEFAULT_ACCOUNTSSERVICE_DIR = Path("/var/lib/AccountsService/icons")
@dataclass
class User:
"""Represents a system user suitable for login."""
username: str
uid: int
gecos: str
home: Path
shell: str
@property
def display_name(self) -> str:
"""Return gecos if available, otherwise username."""
return self.gecos if self.gecos else self.username
def get_users(passwd_path: Path = DEFAULT_PASSWD) -> list[User]:
"""Parse /etc/passwd and return users with UID in the login range."""
users: list[User] = []
if not passwd_path.exists():
return users
for line in passwd_path.read_text().splitlines():
parts = line.split(":")
if len(parts) < 7:
continue
username, _, uid_str, _, gecos, home, shell = parts[0], parts[1], parts[2], parts[3], parts[4], parts[5], parts[6]
try:
uid = int(uid_str)
except ValueError:
continue
if uid < MIN_UID or uid > MAX_UID:
continue
if shell in NOLOGIN_SHELLS:
continue
if "/" in username or username.startswith("."):
continue
users.append(User(
username=username,
uid=uid,
gecos=gecos,
home=Path(home),
shell=shell,
))
return users
def get_avatar_path(
username: str,
accountsservice_dir: Path = DEFAULT_ACCOUNTSSERVICE_DIR,
home_dir: Path | None = None,
) -> Path | None:
"""Find avatar for a user: AccountsService icon → ~/.face → None."""
# AccountsService icon
icon = accountsservice_dir / username
if icon.exists() and not icon.is_symlink():
return icon
# ~/.face fallback
if home_dir is not None:
face = home_dir / ".face"
if face.exists() and not face.is_symlink():
return face
return None
def get_user_gtk_theme(config_dir: Path | None = None) -> str | None:
"""Read the GTK theme name from a user's gtk-4.0/settings.ini."""
if config_dir is None:
return None
settings_file = config_dir / "settings.ini"
if not settings_file.exists():
return None
config = configparser.ConfigParser(interpolation=None)
try:
config.read(settings_file)
except configparser.Error:
return None
if config.has_option("Settings", "gtk-theme-name"):
theme = config.get("Settings", "gtk-theme-name")
if theme:
return theme
return None

112
src/power.rs Normal file
View File

@ -0,0 +1,112 @@
// ABOUTME: Power actions — reboot and shutdown via loginctl.
// ABOUTME: Wrappers around system commands for the greeter UI.
use std::fmt;
use std::process::Command;
#[derive(Debug)]
pub enum PowerError {
CommandFailed { action: &'static str, message: String },
Timeout { action: &'static str },
}
impl fmt::Display for PowerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PowerError::CommandFailed { action, message } => {
write!(f, "{action} failed: {message}")
}
PowerError::Timeout { action } => {
write!(f, "{action} timed out")
}
}
}
}
impl std::error::Error for PowerError {}
/// Run a command and return a PowerError on failure.
fn run_command(action: &'static str, program: &str, args: &[&str]) -> Result<(), PowerError> {
let child = Command::new(program)
.args(args)
.spawn()
.map_err(|e| PowerError::CommandFailed {
action,
message: e.to_string(),
})?;
let output = child
.wait_with_output()
.map_err(|e| PowerError::CommandFailed {
action,
message: e.to_string(),
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(PowerError::CommandFailed {
action,
message: format!("exit code {}: {}", output.status, stderr.trim()),
});
}
Ok(())
}
/// Reboot the system via loginctl.
pub fn reboot() -> Result<(), PowerError> {
run_command("reboot", "/usr/bin/loginctl", &["reboot"])
}
/// Shut down the system via loginctl.
pub fn shutdown() -> Result<(), PowerError> {
run_command("shutdown", "/usr/bin/loginctl", &["poweroff"])
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn power_error_command_failed_display() {
let err = PowerError::CommandFailed {
action: "reboot",
message: "No such file or directory".to_string(),
};
assert_eq!(err.to_string(), "reboot failed: No such file or directory");
}
#[test]
fn power_error_timeout_display() {
let err = PowerError::Timeout { action: "shutdown" };
assert_eq!(err.to_string(), "shutdown timed out");
}
#[test]
fn run_command_returns_error_for_missing_binary() {
let result = run_command("test", "nonexistent-binary-xyz", &[]);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, PowerError::CommandFailed { action: "test", .. }));
}
#[test]
fn run_command_returns_error_on_nonzero_exit() {
let result = run_command("test", "false", &[]);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, PowerError::CommandFailed { action: "test", .. }));
}
#[test]
fn run_command_succeeds_for_true() {
let result = run_command("test", "true", &[]);
assert!(result.is_ok());
}
#[test]
fn run_command_passes_args() {
let result = run_command("test", "echo", &["hello", "world"]);
assert!(result.is_ok());
}
}

228
src/sessions.rs Normal file
View File

@ -0,0 +1,228 @@
// ABOUTME: Session detection — discovers available Wayland and X11 sessions.
// ABOUTME: Parses .desktop files from standard session directories.
use std::fs;
use std::path::{Path, PathBuf};
const DEFAULT_WAYLAND_DIRS: &[&str] = &["/usr/share/wayland-sessions"];
const DEFAULT_XSESSION_DIRS: &[&str] = &["/usr/share/xsessions"];
/// Represents an available login session.
#[derive(Debug, Clone)]
pub struct Session {
pub name: String,
pub exec_cmd: String,
pub session_type: String,
}
/// Parse a .desktop file and return a Session, or None if invalid.
fn parse_desktop_file(path: &Path, session_type: &str) -> Option<Session> {
let content = fs::read_to_string(path).ok()?;
let mut in_section = false;
let mut name: Option<String> = None;
let mut exec_cmd: Option<String> = None;
for line in content.lines() {
let line = line.trim();
if line.starts_with('[') {
in_section = line == "[Desktop Entry]";
continue;
}
if !in_section {
continue;
}
if let Some(value) = line.strip_prefix("Name=") {
if name.is_none() {
name = Some(value.to_string());
}
} else if let Some(value) = line.strip_prefix("Exec=") {
if exec_cmd.is_none() {
exec_cmd = Some(value.to_string());
}
}
}
let name = name.filter(|s| !s.is_empty())?;
let exec_cmd = exec_cmd.filter(|s| !s.is_empty())?;
Some(Session {
name,
exec_cmd,
session_type: session_type.to_string(),
})
}
/// Discover available sessions from .desktop files.
pub fn get_sessions(
wayland_dirs: Option<&[PathBuf]>,
xsession_dirs: Option<&[PathBuf]>,
) -> Vec<Session> {
let default_wayland: Vec<PathBuf> =
DEFAULT_WAYLAND_DIRS.iter().map(PathBuf::from).collect();
let default_xsession: Vec<PathBuf> =
DEFAULT_XSESSION_DIRS.iter().map(PathBuf::from).collect();
let wayland = wayland_dirs.unwrap_or(&default_wayland);
let xsession = xsession_dirs.unwrap_or(&default_xsession);
let mut sessions = Vec::new();
for (dirs, session_type) in [(wayland, "wayland"), (xsession, "x11")] {
for directory in dirs {
let entries = match fs::read_dir(directory) {
Ok(e) => e,
Err(_) => continue,
};
let mut paths: Vec<PathBuf> = entries
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| p.extension().is_some_and(|ext| ext == "desktop"))
.collect();
paths.sort();
for path in paths {
if let Some(session) = parse_desktop_file(&path, session_type) {
sessions.push(session);
}
}
}
}
sessions
}
#[cfg(test)]
mod tests {
use super::*;
fn write_desktop(dir: &Path, name: &str, content: &str) {
fs::write(dir.join(name), content).unwrap();
}
#[test]
fn parse_valid_desktop_file() {
let dir = tempfile::tempdir().unwrap();
let file = dir.path().join("test.desktop");
fs::write(
&file,
"[Desktop Entry]\nName=Niri\nExec=niri-session\n",
)
.unwrap();
let session = parse_desktop_file(&file, "wayland").unwrap();
assert_eq!(session.name, "Niri");
assert_eq!(session.exec_cmd, "niri-session");
assert_eq!(session.session_type, "wayland");
}
#[test]
fn parse_desktop_file_missing_name() {
let dir = tempfile::tempdir().unwrap();
let file = dir.path().join("test.desktop");
fs::write(&file, "[Desktop Entry]\nExec=niri-session\n").unwrap();
assert!(parse_desktop_file(&file, "wayland").is_none());
}
#[test]
fn parse_desktop_file_missing_exec() {
let dir = tempfile::tempdir().unwrap();
let file = dir.path().join("test.desktop");
fs::write(&file, "[Desktop Entry]\nName=Niri\n").unwrap();
assert!(parse_desktop_file(&file, "wayland").is_none());
}
#[test]
fn parse_desktop_file_wrong_section() {
let dir = tempfile::tempdir().unwrap();
let file = dir.path().join("test.desktop");
fs::write(
&file,
"[Other Section]\nName=Niri\nExec=niri-session\n",
)
.unwrap();
assert!(parse_desktop_file(&file, "wayland").is_none());
}
#[test]
fn get_sessions_finds_wayland_and_x11() {
let wayland_dir = tempfile::tempdir().unwrap();
let x11_dir = tempfile::tempdir().unwrap();
write_desktop(
wayland_dir.path(),
"niri.desktop",
"[Desktop Entry]\nName=Niri\nExec=niri-session\n",
);
write_desktop(
x11_dir.path(),
"i3.desktop",
"[Desktop Entry]\nName=i3\nExec=i3\n",
);
let wayland_paths = vec![wayland_dir.path().to_path_buf()];
let x11_paths = vec![x11_dir.path().to_path_buf()];
let sessions = get_sessions(Some(&wayland_paths), Some(&x11_paths));
assert_eq!(sessions.len(), 2);
assert_eq!(sessions[0].name, "Niri");
assert_eq!(sessions[0].session_type, "wayland");
assert_eq!(sessions[1].name, "i3");
assert_eq!(sessions[1].session_type, "x11");
}
#[test]
fn get_sessions_skips_missing_dirs() {
let sessions = get_sessions(
Some(&[PathBuf::from("/nonexistent")]),
Some(&[PathBuf::from("/also-nonexistent")]),
);
assert!(sessions.is_empty());
}
#[test]
fn get_sessions_skips_invalid_files() {
let dir = tempfile::tempdir().unwrap();
write_desktop(
dir.path(),
"valid.desktop",
"[Desktop Entry]\nName=Valid\nExec=valid\n",
);
write_desktop(
dir.path(),
"invalid.desktop",
"[Desktop Entry]\nName=Invalid\n",
);
// Non-.desktop file
fs::write(dir.path().join("readme.txt"), "not a session").unwrap();
let paths = vec![dir.path().to_path_buf()];
let sessions = get_sessions(Some(&paths), Some(&[]));
assert_eq!(sessions.len(), 1);
assert_eq!(sessions[0].name, "Valid");
}
#[test]
fn sessions_sorted_alphabetically() {
let dir = tempfile::tempdir().unwrap();
write_desktop(
dir.path(),
"z-sway.desktop",
"[Desktop Entry]\nName=Sway\nExec=sway\n",
);
write_desktop(
dir.path(),
"a-niri.desktop",
"[Desktop Entry]\nName=Niri\nExec=niri-session\n",
);
let paths = vec![dir.path().to_path_buf()];
let sessions = get_sessions(Some(&paths), Some(&[]));
assert_eq!(sessions.len(), 2);
assert_eq!(sessions[0].name, "Niri");
assert_eq!(sessions[1].name, "Sway");
}
}

280
src/users.rs Normal file
View File

@ -0,0 +1,280 @@
// ABOUTME: User detection — parses /etc/passwd for login users and finds avatars.
// ABOUTME: Provides User struct and helpers for the greeter UI.
use std::fs;
use std::path::{Path, PathBuf};
const MIN_UID: u32 = 1000;
const MAX_UID: u32 = 65533;
const DEFAULT_PASSWD: &str = "/etc/passwd";
const DEFAULT_ACCOUNTSSERVICE_DIR: &str = "/var/lib/AccountsService/icons";
const GRESOURCE_PREFIX: &str = "/dev/moonarch/moongreet";
/// Shells that indicate a user cannot log in.
const NOLOGIN_SHELLS: &[&str] = &[
"/usr/sbin/nologin",
"/sbin/nologin",
"/bin/false",
"/usr/bin/nologin",
];
/// Represents a system user suitable for login.
#[derive(Debug, Clone)]
pub struct User {
pub username: String,
pub uid: u32,
pub gecos: String,
pub home: PathBuf,
pub shell: String,
}
impl User {
/// Return the display name (GECOS if available, otherwise username).
pub fn display_name(&self) -> &str {
if self.gecos.is_empty() {
&self.username
} else {
&self.gecos
}
}
}
/// Parse /etc/passwd and return users with UID in the login range.
pub fn get_users(passwd_path: Option<&Path>) -> Vec<User> {
let path = passwd_path.unwrap_or(Path::new(DEFAULT_PASSWD));
let content = match fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return Vec::new(),
};
let mut users = Vec::new();
for line in content.lines() {
let parts: Vec<&str> = line.split(':').collect();
if parts.len() < 7 {
continue;
}
let username = parts[0];
let uid_str = parts[2];
let gecos = parts[4];
let home = parts[5];
let shell = parts[6];
let uid = match uid_str.parse::<u32>() {
Ok(u) => u,
Err(_) => continue,
};
if uid < MIN_UID || uid > MAX_UID {
continue;
}
if NOLOGIN_SHELLS.contains(&shell) {
continue;
}
// Path traversal prevention
if username.contains('/') || username.starts_with('.') {
continue;
}
users.push(User {
username: username.to_string(),
uid,
gecos: gecos.to_string(),
home: PathBuf::from(home),
shell: shell.to_string(),
});
}
users
}
/// Find avatar for a user: AccountsService icon > ~/.face > None.
/// Rejects symlinks to prevent path traversal.
pub fn get_avatar_path(username: &str, home: &Path) -> Option<PathBuf> {
get_avatar_path_with(username, home, Path::new(DEFAULT_ACCOUNTSSERVICE_DIR))
}
/// Find avatar with configurable AccountsService dir (for testing).
pub fn get_avatar_path_with(
username: &str,
home: &Path,
accountsservice_dir: &Path,
) -> Option<PathBuf> {
// AccountsService icon takes priority
if accountsservice_dir.exists() {
let icon = accountsservice_dir.join(username);
if icon.exists() && !icon.is_symlink() {
return Some(icon);
}
}
// ~/.face fallback
let face = home.join(".face");
if face.exists() && !face.is_symlink() {
return Some(face);
}
None
}
/// Return the GResource path to the default avatar SVG.
pub fn get_default_avatar_path() -> String {
format!("{GRESOURCE_PREFIX}/default-avatar.svg")
}
#[cfg(test)]
mod tests {
use super::*;
fn make_passwd(dir: &Path, content: &str) -> PathBuf {
let path = dir.join("passwd");
fs::write(&path, content).unwrap();
path
}
#[test]
fn parse_normal_user() {
let dir = tempfile::tempdir().unwrap();
let path = make_passwd(
dir.path(),
"testuser:x:1000:1000:Test User:/home/testuser:/bin/bash\n",
);
let users = get_users(Some(&path));
assert_eq!(users.len(), 1);
assert_eq!(users[0].username, "testuser");
assert_eq!(users[0].uid, 1000);
assert_eq!(users[0].display_name(), "Test User");
assert_eq!(users[0].home, PathBuf::from("/home/testuser"));
}
#[test]
fn skip_system_users() {
let dir = tempfile::tempdir().unwrap();
let path = make_passwd(dir.path(), "root:x:0:0:root:/root:/bin/bash\n");
let users = get_users(Some(&path));
assert!(users.is_empty());
}
#[test]
fn skip_nologin_users() {
let dir = tempfile::tempdir().unwrap();
let path = make_passwd(
dir.path(),
"nobody:x:1000:1000::/home/nobody:/usr/sbin/nologin\n",
);
let users = get_users(Some(&path));
assert!(users.is_empty());
}
#[test]
fn skip_users_with_slash_in_name() {
let dir = tempfile::tempdir().unwrap();
let path = make_passwd(
dir.path(),
"bad/user:x:1000:1000::/home/bad:/bin/bash\n",
);
let users = get_users(Some(&path));
assert!(users.is_empty());
}
#[test]
fn skip_users_starting_with_dot() {
let dir = tempfile::tempdir().unwrap();
let path = make_passwd(
dir.path(),
".hidden:x:1000:1000::/home/hidden:/bin/bash\n",
);
let users = get_users(Some(&path));
assert!(users.is_empty());
}
#[test]
fn empty_gecos_uses_username() {
let dir = tempfile::tempdir().unwrap();
let path = make_passwd(
dir.path(),
"testuser:x:1000:1000::/home/testuser:/bin/bash\n",
);
let users = get_users(Some(&path));
assert_eq!(users[0].display_name(), "testuser");
}
#[test]
fn multiple_users() {
let dir = tempfile::tempdir().unwrap();
let path = make_passwd(
dir.path(),
"alice:x:1000:1000:Alice:/home/alice:/bin/bash\n\
bob:x:1001:1001:Bob:/home/bob:/bin/zsh\n",
);
let users = get_users(Some(&path));
assert_eq!(users.len(), 2);
assert_eq!(users[0].username, "alice");
assert_eq!(users[1].username, "bob");
}
#[test]
fn returns_empty_for_missing_file() {
let users = get_users(Some(Path::new("/nonexistent/passwd")));
assert!(users.is_empty());
}
#[test]
fn accountsservice_icon_takes_priority() {
let dir = tempfile::tempdir().unwrap();
let icons_dir = dir.path().join("icons");
fs::create_dir(&icons_dir).unwrap();
let icon = icons_dir.join("testuser");
fs::write(&icon, "fake image").unwrap();
let home = dir.path().join("home");
fs::create_dir(&home).unwrap();
let face = home.join(".face");
fs::write(&face, "fake face").unwrap();
let path = get_avatar_path_with("testuser", &home, &icons_dir);
assert_eq!(path, Some(icon));
}
#[test]
fn face_file_used_when_no_accountsservice() {
let dir = tempfile::tempdir().unwrap();
let home = dir.path().join("home");
fs::create_dir(&home).unwrap();
let face = home.join(".face");
fs::write(&face, "fake face").unwrap();
let path = get_avatar_path_with("testuser", &home, Path::new("/nonexistent"));
assert_eq!(path, Some(face));
}
#[test]
fn returns_none_when_no_avatar() {
let dir = tempfile::tempdir().unwrap();
let path = get_avatar_path_with("testuser", dir.path(), Path::new("/nonexistent"));
assert!(path.is_none());
}
#[test]
fn rejects_symlink_avatar() {
let dir = tempfile::tempdir().unwrap();
let home = dir.path().join("home");
fs::create_dir(&home).unwrap();
let real_file = dir.path().join("real-avatar");
fs::write(&real_file, "fake").unwrap();
std::os::unix::fs::symlink(&real_file, home.join(".face")).unwrap();
let path = get_avatar_path_with("testuser", &home, Path::new("/nonexistent"));
assert!(path.is_none());
}
#[test]
fn default_avatar_path_is_gresource() {
let path = get_default_avatar_path();
assert!(path.contains("default-avatar.svg"));
assert!(path.starts_with("/dev/moonarch/moongreet"));
}
}

View File

View File

@ -1,110 +0,0 @@
# ABOUTME: Tests for configuration loading from moongreet.toml.
# ABOUTME: Verifies parsing of appearance and behavior settings.
from pathlib import Path
import pytest
from moongreet.config import load_config, resolve_wallpaper_path, Config
class TestLoadConfig:
"""Tests for loading moongreet.toml configuration."""
def test_loads_background_path(self, tmp_path: Path) -> None:
toml_file = tmp_path / "moongreet.toml"
toml_file.write_text(
"[appearance]\n"
'background = "/usr/share/backgrounds/test.jpg"\n'
)
config = load_config(toml_file)
assert config.background == Path("/usr/share/backgrounds/test.jpg")
def test_returns_none_background_when_missing(self, tmp_path: Path) -> None:
toml_file = tmp_path / "moongreet.toml"
toml_file.write_text("[appearance]\n")
config = load_config(toml_file)
assert config.background is None
def test_returns_defaults_for_missing_file(self, tmp_path: Path) -> None:
config = load_config(tmp_path / "nonexistent.toml")
assert config.background is None
def test_returns_defaults_for_corrupt_toml(self, tmp_path: Path) -> None:
toml_file = tmp_path / "moongreet.toml"
toml_file.write_text("this is not valid [[[ toml !!!")
config = load_config(toml_file)
assert config.background is None
def test_loads_gtk_theme(self, tmp_path: Path) -> None:
toml_file = tmp_path / "moongreet.toml"
toml_file.write_text(
"[appearance]\n"
'gtk-theme = "Catppuccin-Mocha-Standard-Blue-Dark"\n'
)
config = load_config(toml_file)
assert config.gtk_theme == "Catppuccin-Mocha-Standard-Blue-Dark"
def test_returns_none_gtk_theme_when_missing(self, tmp_path: Path) -> None:
toml_file = tmp_path / "moongreet.toml"
toml_file.write_text("[appearance]\n")
config = load_config(toml_file)
assert config.gtk_theme is None
def test_resolves_relative_path_against_config_dir(self, tmp_path: Path) -> None:
toml_file = tmp_path / "moongreet.toml"
toml_file.write_text(
"[appearance]\n"
'background = "wallpaper.jpg"\n'
)
config = load_config(toml_file)
assert config.background == tmp_path / "wallpaper.jpg"
class TestResolveWallpaperPath:
"""Tests for resolving the wallpaper path from config or package default."""
def test_uses_configured_path_when_exists(self, tmp_path: Path) -> None:
wallpaper = tmp_path / "custom.jpg"
wallpaper.write_bytes(b"fake-image")
config = Config(background=wallpaper)
path, ctx = resolve_wallpaper_path(config)
assert path == wallpaper
assert ctx is None
def test_falls_back_to_package_default(self) -> None:
config = Config(background=None)
path, ctx = resolve_wallpaper_path(config)
assert path is not None
assert path.exists()
assert ctx is not None
# Clean up context manager
ctx.__exit__(None, None, None)
def test_falls_back_when_configured_path_missing(self, tmp_path: Path) -> None:
config = Config(background=tmp_path / "nonexistent.jpg")
path, ctx = resolve_wallpaper_path(config)
assert path is not None
assert path.exists()
assert ctx is not None
ctx.__exit__(None, None, None)

View File

@ -1,126 +0,0 @@
# ABOUTME: Tests for locale detection and string lookup.
# ABOUTME: Verifies DE/EN selection based on system locale.
from pathlib import Path
import pytest
from moongreet.i18n import detect_locale, load_strings, Strings
class TestDetectLocale:
"""Tests for system locale detection."""
def test_reads_lang_env(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("LANG", "de_DE.UTF-8")
result = detect_locale()
assert result == "de"
def test_reads_lang_without_region(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("LANG", "en_US.UTF-8")
result = detect_locale()
assert result == "en"
def test_falls_back_to_locale_conf(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("LANG", raising=False)
locale_conf = tmp_path / "locale.conf"
locale_conf.write_text("LANG=de_AT.UTF-8\n")
result = detect_locale(locale_conf_path=locale_conf)
assert result == "de"
def test_defaults_to_english(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("LANG", raising=False)
missing = tmp_path / "nonexistent"
result = detect_locale(locale_conf_path=missing)
assert result == "en"
def test_handles_bare_language_code(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("LANG", "de")
result = detect_locale()
assert result == "de"
def test_handles_c_locale(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("LANG", "C")
result = detect_locale()
assert result == "en"
def test_handles_posix_locale(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("LANG", "POSIX")
result = detect_locale()
assert result == "en"
def test_rejects_non_alpha_lang(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("LANG", "../../etc")
result = detect_locale()
assert result == "en"
class TestLoadStrings:
"""Tests for loading the correct string table."""
def test_loads_german_strings(self) -> None:
strings = load_strings("de")
assert strings.password_placeholder == "Passwort"
assert strings.reboot_tooltip == "Neustart"
assert strings.shutdown_tooltip == "Herunterfahren"
def test_loads_english_strings(self) -> None:
strings = load_strings("en")
assert strings.password_placeholder == "Password"
assert strings.reboot_tooltip == "Reboot"
assert strings.shutdown_tooltip == "Shut down"
def test_unknown_locale_falls_back_to_english(self) -> None:
strings = load_strings("fr")
assert strings.password_placeholder == "Password"
def test_returns_strings_dataclass(self) -> None:
strings = load_strings("de")
assert isinstance(strings, Strings)
def test_error_messages_are_present(self) -> None:
strings = load_strings("en")
assert strings.wrong_password
assert strings.auth_failed
assert strings.reboot_failed
assert strings.shutdown_failed
assert strings.no_session_selected
assert strings.multi_stage_unsupported
assert strings.invalid_session_command
assert strings.session_start_failed
assert strings.faillock_locked
def test_faillock_warning_template(self) -> None:
strings = load_strings("de")
# Template should accept an int for remaining attempts
result = strings.faillock_attempts_remaining.format(n=1)
assert "1" in result
def test_connection_error_is_generic(self) -> None:
strings = load_strings("en")
# Error messages should not contain format placeholders (no info leakage)
assert "{" not in strings.connection_error
assert "{" not in strings.socket_error

View File

@ -1,478 +0,0 @@
# ABOUTME: Integration tests — verifies the login flow end-to-end via a mock greetd socket.
# ABOUTME: Tests the IPC sequence: create_session → post_auth → start_session.
import json
import os
import socket
import struct
import threading
from pathlib import Path
import pytest
from moongreet.greeter import faillock_warning, FAILLOCK_MAX_ATTEMPTS, LAST_SESSION_DIR
from moongreet.i18n import load_strings
from moongreet.ipc import create_session, post_auth_response, start_session, cancel_session
class MockGreetd:
"""A mock greetd server that listens on a Unix socket and responds to IPC messages."""
def __init__(self, sock_path: Path) -> None:
self.sock_path = sock_path
self._responses: list[dict] = []
self._received: list[dict] = []
self._server: socket.socket | None = None
def expect(self, response: dict) -> None:
"""Queue a response to send for the next received message."""
self._responses.append(response)
@property
def received(self) -> list[dict]:
return self._received
def start(self) -> None:
"""Start the mock server in a background thread."""
self._server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._server.bind(str(self.sock_path))
self._server.listen(1)
self._thread = threading.Thread(target=self._serve, daemon=True)
self._thread.start()
@staticmethod
def _recvall(conn: socket.socket, n: int) -> bytes:
"""Receive exactly n bytes from a socket, handling fragmented reads."""
buf = bytearray()
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
break
buf.extend(chunk)
return bytes(buf)
def _serve(self) -> None:
conn, _ = self._server.accept()
try:
for response in self._responses:
# Receive a message
header = self._recvall(conn, 4)
if len(header) < 4:
break
length = struct.unpack("=I", header)[0]
payload = self._recvall(conn, length)
msg = json.loads(payload.decode("utf-8"))
self._received.append(msg)
# Send response
resp_payload = json.dumps(response).encode("utf-8")
conn.sendall(struct.pack("=I", len(resp_payload)) + resp_payload)
finally:
conn.close()
def stop(self) -> None:
if self._server:
self._server.close()
class TestLoginFlow:
"""Integration tests for the complete login flow via mock greetd."""
def test_successful_login(self, tmp_path: Path) -> None:
"""Simulate a complete successful login: create → auth → start."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
mock.expect({"type": "success"})
mock.expect({"type": "success"})
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
# Step 1: Create session
response = create_session(sock, "dominik")
assert response["type"] == "auth_message"
# Step 2: Send password
response = post_auth_response(sock, "geheim")
assert response["type"] == "success"
# Step 3: Start session
response = start_session(sock, ["Hyprland"])
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
# Verify what the mock received
assert mock.received[0] == {"type": "create_session", "username": "dominik"}
assert mock.received[1] == {"type": "post_auth_message_response", "response": "geheim"}
assert mock.received[2] == {"type": "start_session", "cmd": ["Hyprland"]}
def test_wrong_password_sends_cancel(self, tmp_path: Path) -> None:
"""After a failed login, cancel_session must be sent to free the greetd session."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
mock.expect({"type": "error", "error_type": "auth_error", "description": "Authentication failed"})
mock.expect({"type": "success"}) # Response to cancel_session
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
response = create_session(sock, "dominik")
assert response["type"] == "auth_message"
response = post_auth_response(sock, "falsch")
assert response["type"] == "error"
assert response["description"] == "Authentication failed"
# The greeter must cancel the session after auth failure
response = cancel_session(sock)
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
assert mock.received[2] == {"type": "cancel_session"}
def test_stale_session_cancel_and_retry(self, tmp_path: Path) -> None:
"""When create_session fails due to a stale session, cancel and retry."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
# First create_session → error (stale session)
mock.expect({"type": "error", "error_type": "error", "description": "a session is already being configured"})
# cancel_session → success
mock.expect({"type": "success"})
# Second create_session → auth_message (retry succeeds)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
# post_auth_response → success
mock.expect({"type": "success"})
# start_session → success
mock.expect({"type": "success"})
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
# Step 1: Create session fails
response = create_session(sock, "dominik")
assert response["type"] == "error"
# Step 2: Cancel stale session
response = cancel_session(sock)
assert response["type"] == "success"
# Step 3: Retry create session
response = create_session(sock, "dominik")
assert response["type"] == "auth_message"
# Step 4: Send password
response = post_auth_response(sock, "geheim")
assert response["type"] == "success"
# Step 5: Start session
response = start_session(sock, ["niri-session"])
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
assert mock.received[0] == {"type": "create_session", "username": "dominik"}
assert mock.received[1] == {"type": "cancel_session"}
assert mock.received[2] == {"type": "create_session", "username": "dominik"}
def test_multi_stage_auth_sends_cancel(self, tmp_path: Path) -> None:
"""When greetd sends a second auth_message after password, cancel the session."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "TOTP:"})
mock.expect({"type": "success"}) # Response to cancel_session
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
# Step 1: Create session
response = create_session(sock, "dominik")
assert response["type"] == "auth_message"
# Step 2: Send password — greetd responds with another auth_message
response = post_auth_response(sock, "geheim")
assert response["type"] == "auth_message"
# Step 3: Cancel because multi-stage auth is not supported
response = cancel_session(sock)
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
# Verify cancel was sent
assert mock.received[2] == {"type": "cancel_session"}
def test_cancel_session(self, tmp_path: Path) -> None:
"""Simulate cancelling a session after create."""
sock_path = tmp_path / "greetd.sock"
mock = MockGreetd(sock_path)
mock.expect({"type": "auth_message", "auth_message_type": "secret", "auth_message": "Password:"})
mock.expect({"type": "success"})
mock.start()
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(sock_path))
create_session(sock, "dominik")
response = cancel_session(sock)
assert response["type"] == "success"
sock.close()
finally:
mock.stop()
assert mock.received[1] == {"type": "cancel_session"}
class TestSessionCancellation:
"""Tests for cancelling an in-progress greetd session during user switch."""
def test_cancel_closes_socket_and_sets_event(self, tmp_path: Path) -> None:
"""_cancel_pending_session should close the socket and set the cancelled event."""
from moongreet.greeter import GreeterWindow
win = GreeterWindow.__new__(GreeterWindow)
win._greetd_sock_lock = threading.Lock()
win._login_cancelled = threading.Event()
# Create a real socket pair to verify close
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock_path = tmp_path / "test.sock"
server.bind(str(sock_path))
server.listen(1)
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(str(sock_path))
server.close()
win._greetd_sock = client
win._cancel_pending_session()
assert win._login_cancelled.is_set()
assert win._greetd_sock is None
def test_cancel_is_noop_without_socket(self) -> None:
"""_cancel_pending_session should be safe to call when no socket exists."""
from moongreet.greeter import GreeterWindow
win = GreeterWindow.__new__(GreeterWindow)
win._greetd_sock_lock = threading.Lock()
win._login_cancelled = threading.Event()
win._greetd_sock = None
win._cancel_pending_session()
assert win._login_cancelled.is_set()
assert win._greetd_sock is None
def test_cancel_does_not_block_main_thread(self, tmp_path: Path) -> None:
"""_cancel_pending_session must not do blocking I/O — only close the socket."""
from moongreet.greeter import GreeterWindow
win = GreeterWindow.__new__(GreeterWindow)
win._greetd_sock_lock = threading.Lock()
win._login_cancelled = threading.Event()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
win._greetd_sock = sock
# Should complete nearly instantly (no IPC calls)
import time
start = time.monotonic()
win._cancel_pending_session()
elapsed = time.monotonic() - start
assert elapsed < 0.1 # No blocking I/O
def test_worker_exits_silently_when_cancelled(self, tmp_path: Path) -> None:
"""_login_worker should exit without showing an error when cancelled mid-flight."""
from unittest.mock import MagicMock, patch
from moongreet.greeter import GreeterWindow
from moongreet.users import User
win = GreeterWindow.__new__(GreeterWindow)
win._greetd_sock_lock = threading.Lock()
win._login_cancelled = threading.Event()
win._greetd_sock = None
win._failed_attempts = {}
win._strings = MagicMock()
# Set cancelled before the worker runs
win._login_cancelled.set()
# Create a socket that will fail (simulating closed socket)
sock_path = tmp_path / "greetd.sock"
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(str(sock_path))
server.listen(1)
user = User(username="dom", uid=1000, gecos="Dominik", home=Path("/home/dom"), shell="/bin/zsh")
with patch("moongreet.greeter.GLib.idle_add") as mock_idle:
win._login_worker(user, "pw", MagicMock(exec_cmd="niri-session"), str(sock_path))
# Should NOT have scheduled any error callback
for call in mock_idle.call_args_list:
func = call[0][0]
assert func != win._on_login_error, "Worker should not show error when cancelled"
assert func != win._on_login_auth_error, "Worker should not show auth error when cancelled"
server.close()
class TestFaillockWarning:
"""Tests for the faillock warning message logic."""
def test_no_warning_on_zero_attempts(self) -> None:
strings = load_strings("de")
assert faillock_warning(0, strings) is None
def test_no_warning_on_first_attempt(self) -> None:
strings = load_strings("de")
assert faillock_warning(1, strings) is None
def test_warning_on_second_attempt(self) -> None:
strings = load_strings("de")
warning = faillock_warning(2, strings)
assert warning is not None
assert "1" in warning # 1 Versuch übrig
def test_warning_on_third_attempt(self) -> None:
strings = load_strings("de")
warning = faillock_warning(3, strings)
assert warning is not None
assert warning == strings.faillock_locked
def test_warning_beyond_max_attempts(self) -> None:
strings = load_strings("de")
warning = faillock_warning(4, strings)
assert warning is not None
assert warning == strings.faillock_locked
def test_max_attempts_constant_is_three(self) -> None:
assert FAILLOCK_MAX_ATTEMPTS == 3
class TestLastUser:
"""Tests for saving and loading the last logged-in user."""
def test_save_and_load_last_user(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "cache" / "moongreet" / "last-user"
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
GreeterWindow._save_last_user("dominik")
assert cache_path.exists()
assert cache_path.read_text() == "dominik"
result = GreeterWindow._load_last_user()
assert result == "dominik"
def test_load_last_user_missing_file(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "nonexistent" / "last-user"
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user()
assert result is None
def test_load_last_user_rejects_oversized_username(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "last-user"
cache_path.write_text("a" * 300)
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user()
assert result is None
def test_load_last_user_rejects_invalid_characters(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
cache_path = tmp_path / "last-user"
cache_path.write_text("../../etc/passwd")
monkeypatch.setattr("moongreet.greeter.LAST_USER_PATH", cache_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_user()
assert result is None
class TestLastSession:
"""Tests for saving and loading the last session per user."""
def test_save_and_load_last_session(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
from moongreet.greeter import GreeterWindow
GreeterWindow._save_last_session("dominik", "Niri")
session_file = tmp_path / "dominik"
assert session_file.exists()
assert session_file.read_text() == "Niri"
result = GreeterWindow._load_last_session("dominik")
assert result == "Niri"
def test_load_last_session_missing_file(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_session("nobody")
assert result is None
def test_load_last_session_rejects_oversized_name(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
(tmp_path / "dominik").write_text("A" * 300)
from moongreet.greeter import GreeterWindow
result = GreeterWindow._load_last_session("dominik")
assert result is None
def test_save_last_session_validates_username(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""Usernames with path traversal should not create files outside the cache dir."""
monkeypatch.setattr("moongreet.greeter.LAST_SESSION_DIR", tmp_path)
from moongreet.greeter import GreeterWindow
GreeterWindow._save_last_session("../../etc/evil", "Niri")
# Should not have created any file
assert not (tmp_path / "../../etc/evil").exists()
def test_regex_rejects_dot_dot_username(self) -> None:
"""Username '..' must not pass VALID_USERNAME validation."""
from moongreet.greeter import VALID_USERNAME
assert VALID_USERNAME.match("..") is None
def test_regex_rejects_dot_username(self) -> None:
"""Username '.' must not pass VALID_USERNAME validation."""
from moongreet.greeter import VALID_USERNAME
assert VALID_USERNAME.match(".") is None
def test_regex_allows_dot_in_middle(self) -> None:
"""Usernames like 'first.last' must still be valid."""
from moongreet.greeter import VALID_USERNAME
assert VALID_USERNAME.match("first.last") is not None
def test_regex_rejects_leading_dot(self) -> None:
"""Usernames starting with '.' are rejected (hidden files)."""
from moongreet.greeter import VALID_USERNAME
assert VALID_USERNAME.match(".hidden") is None

View File

@ -1,251 +0,0 @@
# ABOUTME: Tests for greetd IPC protocol — socket communication with length-prefixed JSON.
# ABOUTME: Uses mock sockets to verify message encoding/decoding and greetd request types.
import json
import struct
import socket
from unittest.mock import MagicMock, patch
import pytest
from moongreet.ipc import (
send_message,
recv_message,
create_session,
post_auth_response,
start_session,
cancel_session,
)
class FakeSocket:
"""A fake socket that records sent data and provides canned receive data."""
def __init__(self, recv_data: bytes = b""):
self.sent = bytearray()
self._recv_data = recv_data
self._recv_offset = 0
def sendall(self, data: bytes) -> None:
self.sent.extend(data)
def recv(self, n: int, flags: int = 0) -> bytes:
chunk = self._recv_data[self._recv_offset : self._recv_offset + n]
self._recv_offset += n
return chunk
@classmethod
def with_response(cls, response: dict) -> "FakeSocket":
"""Create a FakeSocket pre-loaded with a length-prefixed JSON response."""
payload = json.dumps(response).encode("utf-8")
data = struct.pack("=I", len(payload)) + payload
return cls(recv_data=data)
class FragmentingSocket:
"""A fake socket that delivers data in small chunks to simulate fragmentation."""
def __init__(self, data: bytes, chunk_size: int = 3):
self.sent = bytearray()
self._data = data
self._offset = 0
self._chunk_size = chunk_size
def sendall(self, data: bytes) -> None:
self.sent.extend(data)
def recv(self, n: int, flags: int = 0) -> bytes:
available = min(n, self._chunk_size, len(self._data) - self._offset)
if available <= 0:
return b""
chunk = self._data[self._offset : self._offset + available]
self._offset += available
return chunk
class TestSendMessage:
"""Tests for encoding and sending length-prefixed JSON messages."""
def test_sends_length_prefixed_json(self) -> None:
sock = FakeSocket()
msg = {"type": "create_session", "username": "testuser"}
send_message(sock, msg)
payload = json.dumps(msg).encode("utf-8")
expected = struct.pack("=I", len(payload)) + payload
assert bytes(sock.sent) == expected
def test_sends_empty_dict(self) -> None:
sock = FakeSocket()
send_message(sock, {})
payload = json.dumps({}).encode("utf-8")
expected = struct.pack("=I", len(payload)) + payload
assert bytes(sock.sent) == expected
def test_sends_nested_message(self) -> None:
sock = FakeSocket()
msg = {"type": "post_auth_message_response", "response": "secret123"}
send_message(sock, msg)
# Verify the payload is correctly length-prefixed
length_bytes = bytes(sock.sent[:4])
length = struct.unpack("=I", length_bytes)[0]
decoded = json.loads(sock.sent[4:])
assert length == len(json.dumps(msg).encode("utf-8"))
assert decoded == msg
def test_rejects_oversized_payload(self) -> None:
sock = FakeSocket()
msg = {"type": "huge", "data": "x" * 100000}
with pytest.raises(ValueError, match="Payload too large"):
send_message(sock, msg)
class TestRecvMessage:
"""Tests for receiving and decoding length-prefixed JSON messages."""
def test_receives_valid_message(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = recv_message(sock)
assert result == response
def test_receives_complex_message(self) -> None:
response = {
"type": "auth_message",
"auth_message_type": "secret",
"auth_message": "Password:",
}
sock = FakeSocket.with_response(response)
result = recv_message(sock)
assert result == response
def test_raises_on_empty_recv(self) -> None:
sock = FakeSocket(recv_data=b"")
with pytest.raises(ConnectionError):
recv_message(sock)
def test_receives_fragmented_data(self) -> None:
"""recv() may return fewer bytes than requested — must loop."""
response = {"type": "success"}
payload = json.dumps(response).encode("utf-8")
data = struct.pack("=I", len(payload)) + payload
sock = FragmentingSocket(data, chunk_size=3)
result = recv_message(sock)
assert result == response
def test_rejects_oversized_payload(self) -> None:
"""Payloads exceeding the size limit must be rejected."""
header = struct.pack("=I", 10_000_000)
sock = FakeSocket(recv_data=header)
with pytest.raises(ConnectionError, match="too large"):
recv_message(sock)
class TestCreateSession:
"""Tests for the create_session greetd request."""
def test_sends_create_session_request(self) -> None:
response = {
"type": "auth_message",
"auth_message_type": "secret",
"auth_message": "Password:",
}
sock = FakeSocket.with_response(response)
result = create_session(sock, "dominik")
# Verify sent message
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {"type": "create_session", "username": "dominik"}
assert result == response
class TestPostAuthResponse:
"""Tests for posting authentication responses (passwords)."""
def test_sends_password_response(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = post_auth_response(sock, "mypassword")
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {
"type": "post_auth_message_response",
"response": "mypassword",
}
assert result == response
def test_sends_none_response(self) -> None:
"""For auth types that don't require a response."""
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = post_auth_response(sock, None)
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {
"type": "post_auth_message_response",
"response": None,
}
class TestStartSession:
"""Tests for starting a session after authentication."""
def test_sends_start_session_request(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = start_session(sock, ["Hyprland"])
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {"type": "start_session", "cmd": ["Hyprland"]}
assert result == response
def test_sends_multi_arg_command(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = start_session(sock, ["sway", "--config", "/etc/sway/config"])
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {
"type": "start_session",
"cmd": ["sway", "--config", "/etc/sway/config"],
}
class TestCancelSession:
"""Tests for cancelling an in-progress session."""
def test_sends_cancel_session_request(self) -> None:
response = {"type": "success"}
sock = FakeSocket.with_response(response)
result = cancel_session(sock)
length = struct.unpack("=I", bytes(sock.sent[:4]))[0]
sent_msg = json.loads(sock.sent[4 : 4 + length])
assert sent_msg == {"type": "cancel_session"}
assert result == response

View File

@ -1,61 +0,0 @@
# ABOUTME: Tests for power actions — reboot and shutdown via loginctl.
# ABOUTME: Uses mocking to avoid actually calling system commands.
import subprocess
from unittest.mock import patch, call
import pytest
from moongreet.power import reboot, shutdown, POWER_TIMEOUT
class TestReboot:
"""Tests for the reboot power action."""
@patch("moongreet.power.subprocess.run")
def test_calls_loginctl_reboot(self, mock_run) -> None:
reboot()
mock_run.assert_called_once_with(
["loginctl", "reboot"], check=True, timeout=POWER_TIMEOUT
)
@patch("moongreet.power.subprocess.run")
def test_raises_on_failure(self, mock_run) -> None:
mock_run.side_effect = subprocess.CalledProcessError(1, "loginctl")
with pytest.raises(subprocess.CalledProcessError):
reboot()
@patch("moongreet.power.subprocess.run")
def test_raises_on_timeout(self, mock_run) -> None:
mock_run.side_effect = subprocess.TimeoutExpired("loginctl", POWER_TIMEOUT)
with pytest.raises(subprocess.TimeoutExpired):
reboot()
class TestShutdown:
"""Tests for the shutdown power action."""
@patch("moongreet.power.subprocess.run")
def test_calls_loginctl_poweroff(self, mock_run) -> None:
shutdown()
mock_run.assert_called_once_with(
["loginctl", "poweroff"], check=True, timeout=POWER_TIMEOUT
)
@patch("moongreet.power.subprocess.run")
def test_raises_on_failure(self, mock_run) -> None:
mock_run.side_effect = subprocess.CalledProcessError(1, "loginctl")
with pytest.raises(subprocess.CalledProcessError):
shutdown()
@patch("moongreet.power.subprocess.run")
def test_raises_on_timeout(self, mock_run) -> None:
mock_run.side_effect = subprocess.TimeoutExpired("loginctl", POWER_TIMEOUT)
with pytest.raises(subprocess.TimeoutExpired):
shutdown()

View File

@ -1,104 +0,0 @@
# ABOUTME: Tests for session detection — parsing .desktop files from wayland/xsessions dirs.
# ABOUTME: Uses temporary directories to simulate session file locations.
from pathlib import Path
import pytest
from moongreet.sessions import Session, get_sessions
class TestGetSessions:
"""Tests for discovering available sessions from .desktop files."""
def test_finds_wayland_session(self, tmp_path: Path) -> None:
wayland_dir = tmp_path / "wayland-sessions"
wayland_dir.mkdir()
desktop = wayland_dir / "hyprland.desktop"
desktop.write_text(
"[Desktop Entry]\n"
"Name=Hyprland\n"
"Exec=Hyprland\n"
"Type=Application\n"
)
sessions = get_sessions(wayland_dirs=[wayland_dir], xsession_dirs=[])
assert len(sessions) == 1
assert sessions[0].name == "Hyprland"
assert sessions[0].exec_cmd == "Hyprland"
assert sessions[0].session_type == "wayland"
def test_finds_xsession(self, tmp_path: Path) -> None:
x_dir = tmp_path / "xsessions"
x_dir.mkdir()
desktop = x_dir / "i3.desktop"
desktop.write_text(
"[Desktop Entry]\n"
"Name=i3\n"
"Exec=i3\n"
"Type=Application\n"
)
sessions = get_sessions(wayland_dirs=[], xsession_dirs=[x_dir])
assert len(sessions) == 1
assert sessions[0].session_type == "x11"
def test_finds_sessions_from_multiple_dirs(self, tmp_path: Path) -> None:
wayland_dir = tmp_path / "wayland-sessions"
wayland_dir.mkdir()
(wayland_dir / "sway.desktop").write_text(
"[Desktop Entry]\nName=Sway\nExec=sway\n"
)
x_dir = tmp_path / "xsessions"
x_dir.mkdir()
(x_dir / "openbox.desktop").write_text(
"[Desktop Entry]\nName=Openbox\nExec=openbox-session\n"
)
sessions = get_sessions(wayland_dirs=[wayland_dir], xsession_dirs=[x_dir])
names = {s.name for s in sessions}
assert names == {"Sway", "Openbox"}
def test_returns_empty_for_no_sessions(self, tmp_path: Path) -> None:
empty = tmp_path / "empty"
sessions = get_sessions(wayland_dirs=[empty], xsession_dirs=[empty])
assert sessions == []
def test_skips_files_without_name(self, tmp_path: Path) -> None:
wayland_dir = tmp_path / "wayland-sessions"
wayland_dir.mkdir()
(wayland_dir / "broken.desktop").write_text(
"[Desktop Entry]\nExec=something\n"
)
sessions = get_sessions(wayland_dirs=[wayland_dir], xsession_dirs=[])
assert sessions == []
def test_skips_files_without_exec(self, tmp_path: Path) -> None:
wayland_dir = tmp_path / "wayland-sessions"
wayland_dir.mkdir()
(wayland_dir / "noexec.desktop").write_text(
"[Desktop Entry]\nName=NoExec\n"
)
sessions = get_sessions(wayland_dirs=[wayland_dir], xsession_dirs=[])
assert sessions == []
def test_handles_exec_with_arguments(self, tmp_path: Path) -> None:
wayland_dir = tmp_path / "wayland-sessions"
wayland_dir.mkdir()
(wayland_dir / "sway.desktop").write_text(
"[Desktop Entry]\nName=Sway\nExec=sway --config /etc/sway/config\n"
)
sessions = get_sessions(wayland_dirs=[wayland_dir], xsession_dirs=[])
assert sessions[0].exec_cmd == "sway --config /etc/sway/config"

View File

@ -1,215 +0,0 @@
# ABOUTME: Tests for user detection — parsing /etc/passwd, avatar lookup, GTK theme reading.
# ABOUTME: Uses temporary files and mocking to avoid system dependencies.
from pathlib import Path
from dataclasses import dataclass
import pytest
from moongreet.users import User, get_users, get_avatar_path, get_user_gtk_theme
class TestGetUsers:
"""Tests for parsing /etc/passwd to find login users."""
def test_returns_users_in_uid_range(self, tmp_path: Path) -> None:
passwd = tmp_path / "passwd"
passwd.write_text(
"root:x:0:0:root:/root:/bin/bash\n"
"nobody:x:65534:65534:Nobody:/:/usr/bin/nologin\n"
"dominik:x:1000:1000:Dominik:/home/dominik:/bin/zsh\n"
"testuser:x:1001:1001:Test User:/home/testuser:/bin/bash\n"
)
users = get_users(passwd_path=passwd)
assert len(users) == 2
assert users[0].username == "dominik"
assert users[0].uid == 1000
assert users[0].gecos == "Dominik"
assert users[0].home == Path("/home/dominik")
assert users[1].username == "testuser"
def test_excludes_nologin_shells(self, tmp_path: Path) -> None:
passwd = tmp_path / "passwd"
passwd.write_text(
"systemuser:x:1000:1000:System:/home/system:/usr/sbin/nologin\n"
"falseuser:x:1001:1001:False:/home/false:/bin/false\n"
"realuser:x:1002:1002:Real:/home/real:/bin/bash\n"
)
users = get_users(passwd_path=passwd)
assert len(users) == 1
assert users[0].username == "realuser"
def test_returns_empty_for_no_matching_users(self, tmp_path: Path) -> None:
passwd = tmp_path / "passwd"
passwd.write_text("root:x:0:0:root:/root:/bin/bash\n")
users = get_users(passwd_path=passwd)
assert users == []
def test_handles_missing_gecos_field(self, tmp_path: Path) -> None:
passwd = tmp_path / "passwd"
passwd.write_text("user:x:1000:1000::/home/user:/bin/bash\n")
users = get_users(passwd_path=passwd)
assert len(users) == 1
assert users[0].gecos == ""
assert users[0].display_name == "user"
def test_skips_invalid_uid(self, tmp_path: Path) -> None:
"""Corrupt /etc/passwd with non-numeric UID should not crash."""
passwd = tmp_path / "passwd"
passwd.write_text(
"corrupt:x:NOTANUMBER:1000:Corrupt:/home/corrupt:/bin/bash\n"
"valid:x:1000:1000:Valid:/home/valid:/bin/bash\n"
)
users = get_users(passwd_path=passwd)
assert len(users) == 1
assert users[0].username == "valid"
def test_skips_username_with_slash(self, tmp_path: Path) -> None:
"""Usernames containing path separators should be rejected."""
passwd = tmp_path / "passwd"
passwd.write_text(
"../evil:x:1000:1000:Evil:/home/evil:/bin/bash\n"
"normal:x:1001:1001:Normal:/home/normal:/bin/bash\n"
)
users = get_users(passwd_path=passwd)
assert len(users) == 1
assert users[0].username == "normal"
class TestGetAvatarPath:
"""Tests for avatar file lookup."""
def test_finds_accountsservice_icon(self, tmp_path: Path) -> None:
icons_dir = tmp_path / "icons"
icons_dir.mkdir()
avatar = icons_dir / "dominik"
avatar.write_bytes(b"PNG")
result = get_avatar_path("dominik", accountsservice_dir=icons_dir)
assert result == avatar
def test_falls_back_to_dot_face(self, tmp_path: Path) -> None:
home = tmp_path / "home" / "dominik"
home.mkdir(parents=True)
face = home / ".face"
face.write_bytes(b"PNG")
empty_icons = tmp_path / "no_icons"
result = get_avatar_path(
"dominik", accountsservice_dir=empty_icons, home_dir=home
)
assert result == face
def test_ignores_symlinked_face(self, tmp_path: Path) -> None:
"""~/.face as symlink should be ignored to prevent traversal."""
home = tmp_path / "home" / "attacker"
home.mkdir(parents=True)
target = tmp_path / "secret.txt"
target.write_text("sensitive data")
face = home / ".face"
face.symlink_to(target)
empty_icons = tmp_path / "no_icons"
result = get_avatar_path(
"attacker", accountsservice_dir=empty_icons, home_dir=home
)
assert result is None
def test_returns_none_when_no_avatar(self, tmp_path: Path) -> None:
empty_icons = tmp_path / "no_icons"
home = tmp_path / "home" / "nobody"
result = get_avatar_path(
"nobody", accountsservice_dir=empty_icons, home_dir=home
)
assert result is None
class TestGetUserGtkTheme:
"""Tests for reading GTK theme from user's settings.ini."""
def test_reads_theme_from_settings(self, tmp_path: Path) -> None:
gtk_dir = tmp_path / ".config" / "gtk-4.0"
gtk_dir.mkdir(parents=True)
settings = gtk_dir / "settings.ini"
settings.write_text(
"[Settings]\n"
"gtk-theme-name=Adwaita-dark\n"
"gtk-icon-theme-name=Papirus\n"
)
result = get_user_gtk_theme(config_dir=gtk_dir)
assert result == "Adwaita-dark"
def test_returns_none_when_no_settings(self, tmp_path: Path) -> None:
gtk_dir = tmp_path / "nonexistent"
result = get_user_gtk_theme(config_dir=gtk_dir)
assert result is None
def test_returns_none_when_no_theme_key(self, tmp_path: Path) -> None:
gtk_dir = tmp_path / ".config" / "gtk-4.0"
gtk_dir.mkdir(parents=True)
settings = gtk_dir / "settings.ini"
settings.write_text("[Settings]\ngtk-icon-theme-name=Papirus\n")
result = get_user_gtk_theme(config_dir=gtk_dir)
assert result is None
def test_returns_none_for_corrupt_settings_ini(self, tmp_path: Path) -> None:
"""settings.ini without section header should not crash."""
gtk_dir = tmp_path / ".config" / "gtk-4.0"
gtk_dir.mkdir(parents=True)
settings = gtk_dir / "settings.ini"
settings.write_text("gtk-theme-name=Adwaita-dark\n")
result = get_user_gtk_theme(config_dir=gtk_dir)
assert result is None
def test_passes_theme_with_special_characters(self, tmp_path: Path) -> None:
"""Theme names with special characters are passed through to GTK."""
gtk_dir = tmp_path / ".config" / "gtk-4.0"
gtk_dir.mkdir(parents=True)
settings = gtk_dir / "settings.ini"
settings.write_text(
"[Settings]\ngtk-theme-name=catppuccin-mocha-lavender-standard+default\n"
)
result = get_user_gtk_theme(config_dir=gtk_dir)
assert result == "catppuccin-mocha-lavender-standard+default"
def test_ignores_symlinked_accountsservice_icon(self, tmp_path: Path) -> None:
"""AccountsService icon as symlink should be ignored to prevent traversal."""
icons_dir = tmp_path / "icons"
icons_dir.mkdir()
target = tmp_path / "secret.txt"
target.write_text("sensitive data")
icon = icons_dir / "attacker"
icon.symlink_to(target)
result = get_avatar_path(
"attacker", accountsservice_dir=icons_dir
)
assert result is None

45
uv.lock generated
View File

@ -1,45 +0,0 @@
version = 1
revision = 3
requires-python = ">=3.11"
[[package]]
name = "moongreet"
version = "0.1.0"
source = { editable = "." }
dependencies = [
{ name = "pygobject" },
]
[package.metadata]
requires-dist = [{ name = "pygobject", specifier = ">=3.46" }]
[[package]]
name = "pycairo"
version = "1.29.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/22/d9/1728840a22a4ef8a8f479b9156aa2943cd98c3907accd3849fb0d5f82bfd/pycairo-1.29.0.tar.gz", hash = "sha256:f3f7fde97325cae80224c09f12564ef58d0d0f655da0e3b040f5807bd5bd3142", size = 665871, upload-time = "2025-11-11T19:13:01.584Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/31/92/1b904087e831806a449502786d47d3a468e5edb8f65755f6bd88e8038e53/pycairo-1.29.0-cp311-cp311-win32.whl", hash = "sha256:12757ebfb304b645861283c20585c9204c3430671fad925419cba04844d6dfed", size = 751342, upload-time = "2025-11-11T19:11:37.386Z" },
{ url = "https://files.pythonhosted.org/packages/db/09/a0ab6a246a7ede89e817d749a941df34f27a74bedf15551da51e86ae105e/pycairo-1.29.0-cp311-cp311-win_amd64.whl", hash = "sha256:3391532db03f9601c1cee9ebfa15b7d1db183c6020f3e75c1348cee16825934f", size = 845036, upload-time = "2025-11-11T19:11:43.408Z" },
{ url = "https://files.pythonhosted.org/packages/3c/b2/bf455454bac50baef553e7356d36b9d16e482403bf132cfb12960d2dc2e7/pycairo-1.29.0-cp311-cp311-win_arm64.whl", hash = "sha256:b69be8bb65c46b680771dc6a1a422b1cdd0cffb17be548f223e8cbbb6205567c", size = 694644, upload-time = "2025-11-11T19:11:48.599Z" },
{ url = "https://files.pythonhosted.org/packages/f6/28/6363087b9e60af031398a6ee5c248639eefc6cc742884fa2789411b1f73b/pycairo-1.29.0-cp312-cp312-win32.whl", hash = "sha256:91bcd7b5835764c616a615d9948a9afea29237b34d2ed013526807c3d79bb1d0", size = 751486, upload-time = "2025-11-11T19:11:54.451Z" },
{ url = "https://files.pythonhosted.org/packages/3a/d2/d146f1dd4ef81007686ac52231dd8f15ad54cf0aa432adaefc825475f286/pycairo-1.29.0-cp312-cp312-win_amd64.whl", hash = "sha256:3f01c3b5e49ef9411fff6bc7db1e765f542dc1c9cfed4542958a5afa3a8b8e76", size = 845383, upload-time = "2025-11-11T19:12:01.551Z" },
{ url = "https://files.pythonhosted.org/packages/01/16/6e6f33bb79ec4a527c9e633915c16dc55a60be26b31118dbd0d5859e8c51/pycairo-1.29.0-cp312-cp312-win_arm64.whl", hash = "sha256:eafe3d2076f3533535ad4a361fa0754e0ee66b90e548a3a0f558fed00b1248f2", size = 694518, upload-time = "2025-11-11T19:12:06.561Z" },
{ url = "https://files.pythonhosted.org/packages/f0/21/3f477dc318dd4e84a5ae6301e67284199d7e5a2384f3063714041086b65d/pycairo-1.29.0-cp313-cp313-win32.whl", hash = "sha256:3eb382a4141591807073274522f7aecab9e8fa2f14feafd11ac03a13a58141d7", size = 750949, upload-time = "2025-11-11T19:12:12.198Z" },
{ url = "https://files.pythonhosted.org/packages/43/34/7d27a333c558d6ac16dbc12a35061d389735e99e494ee4effa4ec6d99bed/pycairo-1.29.0-cp313-cp313-win_amd64.whl", hash = "sha256:91114e4b3fbf4287c2b0788f83e1f566ce031bda49cf1c3c3c19c3e986e95c38", size = 844149, upload-time = "2025-11-11T19:12:19.171Z" },
{ url = "https://files.pythonhosted.org/packages/15/43/e782131e23df69e5c8e631a016ed84f94bbc4981bf6411079f57af730a23/pycairo-1.29.0-cp313-cp313-win_arm64.whl", hash = "sha256:09b7f69a5ff6881e151354ea092137b97b0b1f0b2ab4eb81c92a02cc4a08e335", size = 693595, upload-time = "2025-11-11T19:12:23.445Z" },
{ url = "https://files.pythonhosted.org/packages/2d/fa/87eaeeb9d53344c769839d7b2854db7ff2cd596211e00dd1b702eeb1838f/pycairo-1.29.0-cp314-cp314-win32.whl", hash = "sha256:69e2a7968a3fbb839736257bae153f547bca787113cc8d21e9e08ca4526e0b6b", size = 767198, upload-time = "2025-11-11T19:12:42.336Z" },
{ url = "https://files.pythonhosted.org/packages/3c/90/3564d0f64d0a00926ab863dc3c4a129b1065133128e96900772e1c4421f8/pycairo-1.29.0-cp314-cp314-win_amd64.whl", hash = "sha256:e91243437a21cc4c67c401eff4433eadc45745275fa3ade1a0d877e50ffb90da", size = 871579, upload-time = "2025-11-11T19:12:48.982Z" },
{ url = "https://files.pythonhosted.org/packages/5e/91/93632b6ba12ad69c61991e3208bde88486fdfc152be8cfdd13444e9bc650/pycairo-1.29.0-cp314-cp314-win_arm64.whl", hash = "sha256:b72200ea0e5f73ae4c788cd2028a750062221385eb0e6d8f1ecc714d0b4fdf82", size = 719537, upload-time = "2025-11-11T19:12:55.016Z" },
{ url = "https://files.pythonhosted.org/packages/93/23/37053c039f8d3b9b5017af9bc64d27b680c48a898d48b72e6d6583cf0155/pycairo-1.29.0-cp314-cp314t-win_amd64.whl", hash = "sha256:5e45fce6185f553e79e4ef1722b8e98e6cde9900dbc48cb2637a9ccba86f627a", size = 874015, upload-time = "2025-11-11T19:12:28.47Z" },
{ url = "https://files.pythonhosted.org/packages/d7/54/123f6239685f5f3f2edc123f1e38d2eefacebee18cf3c532d2f4bd51d0ef/pycairo-1.29.0-cp314-cp314t-win_arm64.whl", hash = "sha256:caba0837a4b40d47c8dfb0f24cccc12c7831e3dd450837f2a356c75f21ce5a15", size = 721404, upload-time = "2025-11-11T19:12:36.919Z" },
]
[[package]]
name = "pygobject"
version = "3.56.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pycairo" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a2/80/09247a2be28af2c2240132a0af6c1005a2b1d089242b13a2cd782d2de8d7/pygobject-3.56.2.tar.gz", hash = "sha256:b816098969544081de9eecedb94ad6ac59c77e4d571fe7051f18bebcec074313", size = 1409059, upload-time = "2026-03-25T16:14:04.008Z" }