diff --git a/server/migrations/versions/bbafedfa510c_add_user_table.py b/server/migrations/versions/bbafedfa510c_add_user_table.py new file mode 100644 index 00000000..2078184a --- /dev/null +++ b/server/migrations/versions/bbafedfa510c_add_user_table.py @@ -0,0 +1,38 @@ +"""add user table + +Revision ID: bbafedfa510c +Revises: 5d6b9df9b045 +Create Date: 2025-11-19 21:06:30.543262 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "bbafedfa510c" +down_revision: Union[str, None] = "5d6b9df9b045" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "user", + sa.Column("id", sa.String(), nullable=False), + sa.Column("email", sa.String(), nullable=False), + sa.Column("authentik_uid", sa.String(), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + + with op.batch_alter_table("user", schema=None) as batch_op: + batch_op.create_index("idx_user_authentik_uid", ["authentik_uid"], unique=True) + batch_op.create_index("idx_user_email", ["email"], unique=False) + + +def downgrade() -> None: + op.drop_table("user") diff --git a/server/reflector/auth/auth_jwt.py b/server/reflector/auth/auth_jwt.py index 0dcff9a0..625a371b 100644 --- a/server/reflector/auth/auth_jwt.py +++ b/server/reflector/auth/auth_jwt.py @@ -6,8 +6,10 @@ from jose import JWTError, jwt from pydantic import BaseModel from reflector.db.user_api_keys import user_api_keys_controller +from reflector.db.users import user_controller from reflector.logger import logger from reflector.settings import settings +from reflector.utils import generate_uuid4 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False) api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) @@ -74,9 +76,21 @@ async def _authenticate_user( if jwt_token: try: payload = jwtauth.verify_token(jwt_token) - sub = payload["sub"] + authentik_uid = payload["sub"] email = payload["email"] - user_infos.append(UserInfo(sub=sub, email=email)) + + user = await user_controller.get_by_authentik_uid(authentik_uid) + if not user: + logger.info( + f"Creating new user on first login: {authentik_uid} ({email})" + ) + user = await user_controller.create_or_update( + id=generate_uuid4(), + authentik_uid=authentik_uid, + email=email, + ) + + user_infos.append(UserInfo(sub=user.id, email=email)) except JWTError as e: logger.error(f"JWT error: {e}") raise HTTPException(status_code=401, detail="Invalid authentication") diff --git a/server/reflector/db/__init__.py b/server/reflector/db/__init__.py index 91ed12ee..deffb52a 100644 --- a/server/reflector/db/__init__.py +++ b/server/reflector/db/__init__.py @@ -31,6 +31,7 @@ import reflector.db.recordings # noqa import reflector.db.rooms # noqa import reflector.db.transcripts # noqa import reflector.db.user_api_keys # noqa +import reflector.db.users # noqa kwargs = {} if "postgres" not in settings.DATABASE_URL: diff --git a/server/reflector/db/users.py b/server/reflector/db/users.py new file mode 100644 index 00000000..ccbe11d6 --- /dev/null +++ b/server/reflector/db/users.py @@ -0,0 +1,92 @@ +"""User table for storing Authentik user information.""" + +from datetime import datetime, timezone + +import sqlalchemy +from pydantic import BaseModel, Field + +from reflector.db import get_database, metadata +from reflector.utils import generate_uuid4 +from reflector.utils.string import NonEmptyString + +users = sqlalchemy.Table( + "user", + metadata, + sqlalchemy.Column("id", sqlalchemy.String, primary_key=True), + sqlalchemy.Column("email", sqlalchemy.String, nullable=False), + sqlalchemy.Column("authentik_uid", sqlalchemy.String, nullable=False), + sqlalchemy.Column("created_at", sqlalchemy.DateTime(timezone=True), nullable=False), + sqlalchemy.Column("updated_at", sqlalchemy.DateTime(timezone=True), nullable=False), + sqlalchemy.Index("idx_user_authentik_uid", "authentik_uid", unique=True), + sqlalchemy.Index("idx_user_email", "email", unique=False), +) + + +class User(BaseModel): + id: NonEmptyString = Field(default_factory=generate_uuid4) + email: NonEmptyString + authentik_uid: NonEmptyString + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class UserController: + @staticmethod + async def get_by_id(user_id: NonEmptyString) -> User | None: + query = users.select().where(users.c.id == user_id) + result = await get_database().fetch_one(query) + return User(**result) if result else None + + @staticmethod + async def get_by_authentik_uid(authentik_uid: NonEmptyString) -> User | None: + query = users.select().where(users.c.authentik_uid == authentik_uid) + result = await get_database().fetch_one(query) + return User(**result) if result else None + + @staticmethod + async def get_by_email(email: NonEmptyString) -> User | None: + query = users.select().where(users.c.email == email) + result = await get_database().fetch_one(query) + return User(**result) if result else None + + @staticmethod + async def create_or_update( + id: NonEmptyString, authentik_uid: NonEmptyString, email: NonEmptyString + ) -> User: + existing = await UserController.get_by_authentik_uid(authentik_uid) + now = datetime.now(timezone.utc) + + if existing: + query = ( + users.update() + .where(users.c.authentik_uid == authentik_uid) + .values(email=email, updated_at=now) + ) + await get_database().execute(query) + return User( + id=existing.id, + authentik_uid=authentik_uid, + email=email, + created_at=existing.created_at, + updated_at=now, + ) + else: + user = User( + id=id, + authentik_uid=authentik_uid, + email=email, + created_at=now, + updated_at=now, + ) + query = users.insert().values(**user.model_dump()) + await get_database().execute(query) + return user + + @staticmethod + async def list_all() -> list[User]: + query = users.select().order_by(users.c.created_at.desc()) + results = await get_database().fetch_all(query) + return [User(**r) for r in results] + + +user_controller = UserController() diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index baafaffe..6d1ba358 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -337,19 +337,7 @@ async def rooms_create_meeting( status_code=503, detail="Meeting creation in progress, please try again" ) - if meeting.platform == "daily" and room.recording_trigger != "none": - client = create_platform_client(meeting.platform) - token = await client.create_meeting_token( - meeting.room_name, - enable_recording=True, - user_id=user_id, - ) - meeting = meeting.model_copy() - meeting.room_url = add_query_param(meeting.room_url, "t", token) - if meeting.host_room_url: - meeting.host_room_url = add_query_param(meeting.host_room_url, "t", token) - - if user_id != room.user_id: + if user_id != room.user_id and meeting.platform == "whereby": meeting.host_room_url = "" return meeting @@ -508,7 +496,8 @@ async def rooms_list_active_meetings( if user_id != room.user_id: for meeting in meetings: - meeting.host_room_url = "" + if meeting.platform == "whereby": + meeting.host_room_url = "" return meetings @@ -530,7 +519,7 @@ async def rooms_get_meeting( if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") - if user_id != room.user_id and not room.is_shared: + if user_id != room.user_id and not room.is_shared and meeting.platform == "whereby": meeting.host_room_url = "" return meeting @@ -560,7 +549,20 @@ async def rooms_join_meeting( if meeting.end_date <= current_time: raise HTTPException(status_code=400, detail="Meeting has ended") - if user_id != room.user_id: + if meeting.platform == "daily": + client = create_platform_client(meeting.platform) + enable_recording = room.recording_trigger != "none" + token = await client.create_meeting_token( + meeting.room_name, + enable_recording=enable_recording, + user_id=user_id, + ) + meeting = meeting.model_copy() + meeting.room_url = add_query_param(meeting.room_url, "t", token) + if meeting.host_room_url: + meeting.host_room_url = add_query_param(meeting.host_room_url, "t", token) + + if user_id != room.user_id and meeting.platform == "whereby": meeting.host_room_url = "" return meeting diff --git a/server/reflector/views/user_websocket.py b/server/reflector/views/user_websocket.py index 26d3c8ac..b556f4c4 100644 --- a/server/reflector/views/user_websocket.py +++ b/server/reflector/views/user_websocket.py @@ -3,6 +3,7 @@ from typing import Optional from fastapi import APIRouter, WebSocket from reflector.auth.auth_jwt import JWTAuth # type: ignore +from reflector.db.users import user_controller from reflector.ws_manager import get_ws_manager router = APIRouter() @@ -29,7 +30,18 @@ async def user_events_websocket(websocket: WebSocket): try: payload = JWTAuth().verify_token(token) - user_id = payload.get("sub") + authentik_uid = payload.get("sub") + + if authentik_uid: + user = await user_controller.get_by_authentik_uid(authentik_uid) + if user: + user_id = user.id + else: + await websocket.close(code=UNAUTHORISED) + return + else: + await websocket.close(code=UNAUTHORISED) + return except Exception: await websocket.close(code=UNAUTHORISED) return diff --git a/server/scripts/migrate_user_ids.py b/server/scripts/migrate_user_ids.py new file mode 100755 index 00000000..4fcffe71 --- /dev/null +++ b/server/scripts/migrate_user_ids.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +""" +Manual Migration Script: Migrate user_id from Authentik UID to internal user.id + +This script should be run manually AFTER applying the database schema migrations. + +Usage: + AUTHENTIK_API_URL=https://your-authentik-url \ + AUTHENTIK_API_TOKEN=your-token \ + DATABASE_URL=postgresql://... \ + python scripts/migrate_user_ids.py + +What this script does: +1. Collects all unique Authentik UIDs currently used in the database +2. Fetches only those users from Authentik API to populate the users table +3. Updates user_id in: user_api_key, transcript, room, meeting_consent +4. Uses user.authentik_uid to lookup the corresponding user.id + +The script is idempotent: +- User inserts use ON CONFLICT DO NOTHING (safe if users already exist) +- Update queries only match authentik_uid->uuid pairs (no-op if already migrated) +- Safe to run multiple times without side effects + +Prerequisites: +- AUTHENTIK_API_URL environment variable must be set +- AUTHENTIK_API_TOKEN environment variable must be set +- DATABASE_URL environment variable must be set +- Authentik API must be accessible +""" + +import asyncio +import os +import sys +from datetime import datetime, timezone +from typing import Any + +import httpx +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine + +TABLES_WITH_USER_ID = ["user_api_key", "transcript", "room", "meeting_consent"] +NULLABLE_USER_ID_TABLES = {"transcript", "meeting_consent"} +AUTHENTIK_PAGE_SIZE = 100 +HTTP_TIMEOUT = 30.0 + + +class AuthentikClient: + def __init__(self, api_url: str, api_token: str): + self.api_url = api_url + self.api_token = api_token + + def _get_headers(self) -> dict[str, str]: + return { + "Authorization": f"Bearer {self.api_token}", + "Accept": "application/json", + } + + async def fetch_all_users(self) -> list[dict[str, Any]]: + all_users = [] + page = 1 + + try: + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + while True: + url = f"{self.api_url}/api/v3/core/users/" + params = { + "page": page, + "page_size": AUTHENTIK_PAGE_SIZE, + "include_groups": "false", + } + + print(f" Fetching users from Authentik (page {page})...") + response = await client.get( + url, headers=self._get_headers(), params=params + ) + response.raise_for_status() + data = response.json() + + results = data.get("results", []) + if not results: + break + + all_users.extend(results) + print(f" Fetched {len(results)} users from page {page}") + + if not data.get("next"): + break + + page += 1 + + print(f" Total: {len(all_users)} users fetched from Authentik") + return all_users + + except httpx.HTTPError as e: + raise Exception(f"Failed to fetch users from Authentik: {e}") from e + + +async def collect_used_authentik_uids(connection: AsyncConnection) -> set[str]: + print("\nStep 1: Collecting Authentik UIDs from database tables...") + used_authentik_uids = set() + + for table in TABLES_WITH_USER_ID: + result = await connection.execute( + text(f'SELECT DISTINCT user_id FROM "{table}" WHERE user_id IS NOT NULL') + ) + authentik_uids = [row[0] for row in result.fetchall()] + used_authentik_uids.update(authentik_uids) + print(f" Found {len(authentik_uids)} unique Authentik UIDs in {table}") + + print(f" Total unique user IDs found: {len(used_authentik_uids)}") + + if used_authentik_uids: + sample_id = next(iter(used_authentik_uids)) + if len(sample_id) == 36 and sample_id.count("-") == 4: + print( + f"\n✅ User IDs are already in UUID format (e.g., {sample_id[:20]}...)" + ) + print("Migration has already been completed!") + return set() + + return used_authentik_uids + + +def filter_users_by_authentik_uid( + authentik_users: list[dict[str, Any]], used_authentik_uids: set[str] +) -> tuple[list[dict[str, Any]], set[str]]: + used_authentik_users = [ + user for user in authentik_users if user.get("uid") in used_authentik_uids + ] + + missing_ids = used_authentik_uids - {u.get("uid") for u in used_authentik_users} + + print( + f" Found {len(used_authentik_users)} matching users in Authentik " + f"(out of {len(authentik_users)} total)" + ) + + if missing_ids: + print( + f" ⚠ Warning: {len(missing_ids)} Authentik UIDs in database not found in Authentik:" + ) + for user_id in sorted(missing_ids): + print(f" - {user_id}") + + return used_authentik_users, missing_ids + + +async def sync_users_to_database( + connection: AsyncConnection, authentik_users: list[dict[str, Any]] +) -> tuple[int, int]: + created = 0 + skipped = 0 + now = datetime.now(timezone.utc) + + for authentik_user in authentik_users: + user_id = authentik_user["uuid"] + authentik_uid = authentik_user["uid"] + email = authentik_user.get("email") + + if not email: + print(f" ⚠ Skipping user {authentik_uid} (no email)") + skipped += 1 + continue + + result = await connection.execute( + text(""" + INSERT INTO "user" (id, email, authentik_uid, created_at, updated_at) + VALUES (:id, :email, :authentik_uid, :created_at, :updated_at) + ON CONFLICT (id) DO NOTHING + """), + { + "id": user_id, + "email": email, + "authentik_uid": authentik_uid, + "created_at": now, + "updated_at": now, + }, + ) + if result.rowcount > 0: + created += 1 + + return created, skipped + + +async def migrate_all_user_ids(connection: AsyncConnection) -> int: + print("\nStep 3: Migrating user_id columns from Authentik UID to internal UUID...") + print("(If no rows are updated, migration may have already been completed)") + + total_updated = 0 + + for table in TABLES_WITH_USER_ID: + null_check = ( + f"AND {table}.user_id IS NOT NULL" + if table in NULLABLE_USER_ID_TABLES + else "" + ) + + query = f""" + UPDATE {table} + SET user_id = u.id + FROM "user" u + WHERE {table}.user_id = u.authentik_uid + {null_check} + """ + + print(f" Updating {table}.user_id...") + result = await connection.execute(text(query)) + rows = result.rowcount + print(f" ✓ Updated {rows} rows") + total_updated += rows + + return total_updated + + +async def run_migration( + database_url: str, authentik_api_url: str, authentik_api_token: str +) -> None: + engine = create_async_engine(database_url) + + try: + async with engine.begin() as connection: + used_authentik_uids = await collect_used_authentik_uids(connection) + if not used_authentik_uids: + print("\n⚠️ No user IDs found in database. Nothing to migrate.") + print("Migration complete (no-op)!") + return + + print("\nStep 2: Fetching user data from Authentik and syncing users...") + print("(This script is idempotent - safe to run multiple times)") + print(f"Authentik API URL: {authentik_api_url}") + + client = AuthentikClient(authentik_api_url, authentik_api_token) + authentik_users = await client.fetch_all_users() + + if not authentik_users: + print("\nERROR: No users returned from Authentik API.") + print( + "Please verify your Authentik configuration and ensure users exist." + ) + sys.exit(1) + + used_authentik_users, _ = filter_users_by_authentik_uid( + authentik_users, used_authentik_uids + ) + created, skipped = await sync_users_to_database( + connection, used_authentik_users + ) + + if created > 0: + print(f"✓ Created {created} users from Authentik") + else: + print("✓ No new users created (users may already exist)") + + if skipped > 0: + print(f" ⚠ Skipped {skipped} users without email") + + result = await connection.execute(text('SELECT COUNT(*) FROM "user"')) + user_count = result.scalar() + print(f"✓ Users table now has {user_count} users") + + total_updated = await migrate_all_user_ids(connection) + + if total_updated > 0: + print(f"\n✅ Migration complete! Updated {total_updated} total rows.") + else: + print( + "\n✅ Migration complete! (No rows updated - migration may have already been completed)" + ) + + except Exception as e: + print(f"\n❌ ERROR: Migration failed: {e}") + sys.exit(1) + finally: + await engine.dispose() + + +async def main() -> None: + database_url = os.getenv("DATABASE_URL") + authentik_api_url = os.getenv("AUTHENTIK_API_URL") + authentik_api_token = os.getenv("AUTHENTIK_API_TOKEN") + + if not database_url or not authentik_api_url or not authentik_api_token: + print( + "ERROR: DATABASE_URL, AUTHENTIK_API_URL, and AUTHENTIK_API_TOKEN must be set" + ) + sys.exit(1) + + await run_migration(database_url, authentik_api_url, authentik_api_token) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/server/tests/test_user_websocket_auth.py b/server/tests/test_user_websocket_auth.py index be1a2816..5a40440f 100644 --- a/server/tests/test_user_websocket_auth.py +++ b/server/tests/test_user_websocket_auth.py @@ -120,7 +120,15 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user host, port = appserver_ws_user base_ws = f"http://{host}:{port}/v1/events" - token = _make_dummy_jwt("user-abc") + # Create a test user in the database + from reflector.db.users import user_controller + + test_uid = "user-abc" + user = await user_controller.create_or_update( + id="test-user-id-abc", authentik_uid=test_uid, email="user-abc@example.com" + ) + + token = _make_dummy_jwt(test_uid) subprotocols = ["bearer", token] # Connect and then trigger an event via HTTP create @@ -132,12 +140,13 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user from reflector.auth import current_user, current_user_optional # Override auth dependencies so HTTP request is performed as the same user + # Use the internal user.id (not the Authentik UID) app.dependency_overrides[current_user] = lambda: { - "sub": "user-abc", + "sub": user.id, "email": "user-abc@example.com", } app.dependency_overrides[current_user_optional] = lambda: { - "sub": "user-abc", + "sub": user.id, "email": "user-abc@example.com", } diff --git a/www/.env.example b/www/.env.example index da46b513..1d0b1d20 100644 --- a/www/.env.example +++ b/www/.env.example @@ -22,9 +22,10 @@ AUTHENTIK_CLIENT_SECRET=your-client-secret-here # API URLs API_URL=http://127.0.0.1:1250 +SERVER_API_URL=http://server:1250 WEBSOCKET_URL=ws://127.0.0.1:1250 AUTH_CALLBACK_URL=http://localhost:3000/auth-callback # Sentry # SENTRY_DSN=https://your-dsn@sentry.io/project-id -# SENTRY_IGNORE_API_RESOLUTION_ERROR=1 \ No newline at end of file +# SENTRY_IGNORE_API_RESOLUTION_ERROR=1 diff --git a/www/app/[roomName]/components/DailyRoom.tsx b/www/app/[roomName]/components/DailyRoom.tsx index cfefbf6a..2faedb90 100644 --- a/www/app/[roomName]/components/DailyRoom.tsx +++ b/www/app/[roomName]/components/DailyRoom.tsx @@ -1,8 +1,8 @@ "use client"; -import { useCallback, useEffect, useRef } from "react"; -import { Box } from "@chakra-ui/react"; -import { useRouter } from "next/navigation"; +import { useCallback, useEffect, useRef, useState } from "react"; +import { Box, Spinner, Center, Text } from "@chakra-ui/react"; +import { useRouter, useParams } from "next/navigation"; import DailyIframe, { DailyCall } from "@daily-co/daily-js"; import type { components } from "../../reflector-api"; import { useAuth } from "../../lib/AuthProvider"; @@ -10,6 +10,7 @@ import { ConsentDialogButton, recordingTypeRequiresConsent, } from "../../lib/consent"; +import { useRoomJoinMeeting } from "../../lib/apiHooks"; type Meeting = components["schemas"]["Meeting"]; @@ -19,13 +20,41 @@ interface DailyRoomProps { export default function DailyRoom({ meeting }: DailyRoomProps) { const router = useRouter(); + const params = useParams(); const auth = useAuth(); const status = auth.status; const containerRef = useRef(null); + const joinMutation = useRoomJoinMeeting(); + const [joinedMeeting, setJoinedMeeting] = useState(null); - const roomUrl = meeting?.host_room_url || meeting?.room_url; + const roomName = params?.roomName as string; - const isLoading = status === "loading"; + // Always call /join to get a fresh token with user_id + useEffect(() => { + if (status === "loading" || !meeting?.id || !roomName) return; + + const join = async () => { + try { + const result = await joinMutation.mutateAsync({ + params: { + path: { + room_name: roomName, + meeting_id: meeting.id, + }, + }, + }); + setJoinedMeeting(result); + } catch (error) { + console.error("Failed to join meeting:", error); + } + }; + + join(); + }, [meeting?.id, roomName, status]); + + const roomUrl = joinedMeeting?.host_room_url || joinedMeeting?.room_url; + const isLoading = + status === "loading" || joinMutation.isPending || !joinedMeeting; const handleLeave = useCallback(() => { router.push("/browse"); @@ -87,6 +116,22 @@ export default function DailyRoom({ meeting }: DailyRoomProps) { }; }, [roomUrl, isLoading, handleLeave]); + if (isLoading) { + return ( +
+ +
+ ); + } + + if (joinMutation.isError) { + return ( +
+ Failed to join meeting. Please try again. +
+ ); + } + if (!roomUrl) { return null; } diff --git a/www/app/lib/authBackend.ts b/www/app/lib/authBackend.ts index a44f1d36..7a8fa433 100644 --- a/www/app/lib/authBackend.ts +++ b/www/app/lib/authBackend.ts @@ -22,6 +22,27 @@ import { sequenceThrows } from "./errorUtils"; import { featureEnabled } from "./features"; import { getNextEnvVar } from "./nextBuild"; +async function getUserId(accessToken: string): Promise { + try { + const apiUrl = getNextEnvVar("SERVER_API_URL"); + const response = await fetch(`${apiUrl}/v1/me`, { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + }); + + if (!response.ok) { + return null; + } + + const userInfo = await response.json(); + return userInfo.sub || null; + } catch (error) { + console.error("Error fetching user ID from backend:", error); + return null; + } +} + const TOKEN_CACHE_TTL = REFRESH_ACCESS_TOKEN_BEFORE; const getAuthentikClientId = () => getNextEnvVar("AUTHENTIK_CLIENT_ID"); const getAuthentikClientSecret = () => getNextEnvVar("AUTHENTIK_CLIENT_SECRET"); @@ -122,13 +143,16 @@ export const authOptions = (): AuthOptions => }, async session({ session, token }) { const extendedToken = token as JWTWithAccessToken; + + const userId = await getUserId(extendedToken.accessToken); + return { ...session, accessToken: extendedToken.accessToken, accessTokenExpires: extendedToken.accessTokenExpires, error: extendedToken.error, user: { - id: assertExists(extendedToken.sub), + id: assertExistsAndNonEmptyString(userId, "User ID required"), name: extendedToken.name, email: extendedToken.email, },