mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-03-22 07:06:47 +00:00
98 lines
3.3 KiB
Python
98 lines
3.3 KiB
Python
"""Tests for admin user creation logic (used by create_admin CLI tool)."""
|
|
|
|
import pytest
|
|
|
|
from reflector.auth.password_utils import hash_password, verify_password
|
|
from reflector.db.users import user_controller
|
|
from reflector.utils import generate_uuid4
|
|
|
|
|
|
async def _provision_admin(email: str, password: str):
|
|
"""Mirrors the logic in create_admin.create_admin() without managing DB connections."""
|
|
password_hash = hash_password(password)
|
|
|
|
existing = await user_controller.get_by_email(email)
|
|
if existing:
|
|
await user_controller.set_password_hash(existing.id, password_hash)
|
|
else:
|
|
await user_controller.create_or_update(
|
|
id=generate_uuid4(),
|
|
authentik_uid=f"local:{email}",
|
|
email=email,
|
|
password_hash=password_hash,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_admin_new_user(setup_database):
|
|
await _provision_admin("newadmin@test.com", "password123")
|
|
|
|
user = await user_controller.get_by_email("newadmin@test.com")
|
|
assert user is not None
|
|
assert user.email == "newadmin@test.com"
|
|
assert user.authentik_uid == "local:newadmin@test.com"
|
|
assert user.password_hash is not None
|
|
assert verify_password("password123", user.password_hash)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_admin_updates_existing(setup_database):
|
|
# Create first
|
|
await _provision_admin("admin@test.com", "oldpassword")
|
|
user1 = await user_controller.get_by_email("admin@test.com")
|
|
|
|
# Update password
|
|
await _provision_admin("admin@test.com", "newpassword")
|
|
user2 = await user_controller.get_by_email("admin@test.com")
|
|
|
|
assert user1.id == user2.id # same user, not duplicated
|
|
assert verify_password("newpassword", user2.password_hash)
|
|
assert not verify_password("oldpassword", user2.password_hash)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_admin_idempotent(setup_database):
|
|
await _provision_admin("admin@test.com", "samepassword")
|
|
await _provision_admin("admin@test.com", "samepassword")
|
|
|
|
# Should only have one user
|
|
users = await user_controller.list_all()
|
|
admin_users = [u for u in users if u.email == "admin@test.com"]
|
|
assert len(admin_users) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_or_update_with_password_hash(setup_database):
|
|
"""Test the extended create_or_update method with password_hash parameter."""
|
|
pw_hash = hash_password("test123")
|
|
user = await user_controller.create_or_update(
|
|
id=generate_uuid4(),
|
|
authentik_uid="local:test@example.com",
|
|
email="test@example.com",
|
|
password_hash=pw_hash,
|
|
)
|
|
|
|
assert user.password_hash == pw_hash
|
|
|
|
fetched = await user_controller.get_by_email("test@example.com")
|
|
assert fetched is not None
|
|
assert verify_password("test123", fetched.password_hash)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_password_hash(setup_database):
|
|
"""Test the set_password_hash method."""
|
|
user = await user_controller.create_or_update(
|
|
id=generate_uuid4(),
|
|
authentik_uid="local:pw@test.com",
|
|
email="pw@test.com",
|
|
)
|
|
assert user.password_hash is None
|
|
|
|
pw_hash = hash_password("newpass")
|
|
await user_controller.set_password_hash(user.id, pw_hash)
|
|
|
|
updated = await user_controller.get_by_email("pw@test.com")
|
|
assert updated is not None
|
|
assert verify_password("newpass", updated.password_hash)
|