Files
reflector/server/reflector/db/users.py
Sergey Mankovsky 9bec39808f feat: link transcript participants (#737)
* Sync authentik users

* Migrate user_id from uid to id

* Fix auth user id

* Fix ci migration test

* Fix meeting token creation

* Move user id migration to a script

* Add user on first login

* Fix migration chain

* Rename uid column to authentik_uid

* Fix broken ws test
2025-11-25 19:13:19 +01:00

93 lines
3.3 KiB
Python

"""User table for storing Authentik user information."""
from datetime import datetime, timezone
import sqlalchemy
from pydantic import BaseModel, Field
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
users = sqlalchemy.Table(
"user",
metadata,
sqlalchemy.Column("id", sqlalchemy.String, primary_key=True),
sqlalchemy.Column("email", sqlalchemy.String, nullable=False),
sqlalchemy.Column("authentik_uid", sqlalchemy.String, nullable=False),
sqlalchemy.Column("created_at", sqlalchemy.DateTime(timezone=True), nullable=False),
sqlalchemy.Column("updated_at", sqlalchemy.DateTime(timezone=True), nullable=False),
sqlalchemy.Index("idx_user_authentik_uid", "authentik_uid", unique=True),
sqlalchemy.Index("idx_user_email", "email", unique=False),
)
class User(BaseModel):
id: NonEmptyString = Field(default_factory=generate_uuid4)
email: NonEmptyString
authentik_uid: NonEmptyString
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class UserController:
@staticmethod
async def get_by_id(user_id: NonEmptyString) -> User | None:
query = users.select().where(users.c.id == user_id)
result = await get_database().fetch_one(query)
return User(**result) if result else None
@staticmethod
async def get_by_authentik_uid(authentik_uid: NonEmptyString) -> User | None:
query = users.select().where(users.c.authentik_uid == authentik_uid)
result = await get_database().fetch_one(query)
return User(**result) if result else None
@staticmethod
async def get_by_email(email: NonEmptyString) -> User | None:
query = users.select().where(users.c.email == email)
result = await get_database().fetch_one(query)
return User(**result) if result else None
@staticmethod
async def create_or_update(
id: NonEmptyString, authentik_uid: NonEmptyString, email: NonEmptyString
) -> User:
existing = await UserController.get_by_authentik_uid(authentik_uid)
now = datetime.now(timezone.utc)
if existing:
query = (
users.update()
.where(users.c.authentik_uid == authentik_uid)
.values(email=email, updated_at=now)
)
await get_database().execute(query)
return User(
id=existing.id,
authentik_uid=authentik_uid,
email=email,
created_at=existing.created_at,
updated_at=now,
)
else:
user = User(
id=id,
authentik_uid=authentik_uid,
email=email,
created_at=now,
updated_at=now,
)
query = users.insert().values(**user.model_dump())
await get_database().execute(query)
return user
@staticmethod
async def list_all() -> list[User]:
query = users.select().order_by(users.c.created_at.desc())
results = await get_database().fetch_all(query)
return [User(**r) for r in results]
user_controller = UserController()