mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 12:49:06 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2801ab3643 | |||
|
|
b20cad76e6 | ||
| 28a7258e45 | |||
| a9a4f32324 | |||
|
|
857e035562 |
13
CHANGELOG.md
13
CHANGELOG.md
@@ -1,5 +1,18 @@
|
||||
# Changelog
|
||||
|
||||
## [0.18.0](https://github.com/Monadical-SAS/reflector/compare/v0.17.0...v0.18.0) (2025-11-14)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* daily QOL: participants dictionary ([#721](https://github.com/Monadical-SAS/reflector/issues/721)) ([b20cad7](https://github.com/Monadical-SAS/reflector/commit/b20cad76e69fb6a76405af299a005f1ddcf60eae))
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* add proccessing page to file upload and reprocessing ([#650](https://github.com/Monadical-SAS/reflector/issues/650)) ([28a7258](https://github.com/Monadical-SAS/reflector/commit/28a7258e45317b78e60e6397be2bc503647eaace))
|
||||
* copy transcript ([#674](https://github.com/Monadical-SAS/reflector/issues/674)) ([a9a4f32](https://github.com/Monadical-SAS/reflector/commit/a9a4f32324f66c838e081eee42bb9502f38c1db1))
|
||||
|
||||
## [0.17.0](https://github.com/Monadical-SAS/reflector/compare/v0.16.0...v0.17.0) (2025-11-13)
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
"""add daily participant session table with immutable left_at
|
||||
|
||||
Revision ID: 2b92a1b03caa
|
||||
Revises: f8294b31f022
|
||||
Create Date: 2025-11-13 20:29:30.486577
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "2b92a1b03caa"
|
||||
down_revision: Union[str, None] = "f8294b31f022"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# Create table
|
||||
op.create_table(
|
||||
"daily_participant_session",
|
||||
sa.Column("id", sa.String(), nullable=False),
|
||||
sa.Column("meeting_id", sa.String(), nullable=False),
|
||||
sa.Column("room_id", sa.String(), nullable=False),
|
||||
sa.Column("session_id", sa.String(), nullable=False),
|
||||
sa.Column("user_id", sa.String(), nullable=True),
|
||||
sa.Column("user_name", sa.String(), nullable=False),
|
||||
sa.Column("joined_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("left_at", sa.DateTime(timezone=True), nullable=True),
|
||||
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
|
||||
sa.ForeignKeyConstraint(["room_id"], ["room.id"], ondelete="CASCADE"),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
with op.batch_alter_table("daily_participant_session", schema=None) as batch_op:
|
||||
batch_op.create_index(
|
||||
"idx_daily_session_meeting_left", ["meeting_id", "left_at"], unique=False
|
||||
)
|
||||
batch_op.create_index("idx_daily_session_room", ["room_id"], unique=False)
|
||||
|
||||
# Create trigger function to prevent left_at from being updated once set
|
||||
op.execute("""
|
||||
CREATE OR REPLACE FUNCTION prevent_left_at_update()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
IF OLD.left_at IS NOT NULL THEN
|
||||
RAISE EXCEPTION 'left_at is immutable once set';
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
""")
|
||||
|
||||
# Create trigger
|
||||
op.execute("""
|
||||
CREATE TRIGGER prevent_left_at_update_trigger
|
||||
BEFORE UPDATE ON daily_participant_session
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION prevent_left_at_update();
|
||||
""")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# Drop trigger
|
||||
op.execute(
|
||||
"DROP TRIGGER IF EXISTS prevent_left_at_update_trigger ON daily_participant_session;"
|
||||
)
|
||||
|
||||
# Drop trigger function
|
||||
op.execute("DROP FUNCTION IF EXISTS prevent_left_at_update();")
|
||||
|
||||
# Drop indexes and table
|
||||
with op.batch_alter_table("daily_participant_session", schema=None) as batch_op:
|
||||
batch_op.drop_index("idx_daily_session_room")
|
||||
batch_op.drop_index("idx_daily_session_meeting_left")
|
||||
|
||||
op.drop_table("daily_participant_session")
|
||||
@@ -25,6 +25,7 @@ def get_database() -> databases.Database:
|
||||
|
||||
# import models
|
||||
import reflector.db.calendar_events # noqa
|
||||
import reflector.db.daily_participant_sessions # noqa
|
||||
import reflector.db.meetings # noqa
|
||||
import reflector.db.recordings # noqa
|
||||
import reflector.db.rooms # noqa
|
||||
|
||||
169
server/reflector/db/daily_participant_sessions.py
Normal file
169
server/reflector/db/daily_participant_sessions.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""Daily.co participant session tracking.
|
||||
|
||||
Stores webhook data for participant.joined and participant.left events to provide
|
||||
historical session information (Daily.co API only returns current participants).
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
import sqlalchemy as sa
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
|
||||
from reflector.db import get_database, metadata
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
daily_participant_sessions = sa.Table(
|
||||
"daily_participant_session",
|
||||
metadata,
|
||||
sa.Column("id", sa.String, primary_key=True),
|
||||
sa.Column(
|
||||
"meeting_id",
|
||||
sa.String,
|
||||
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column(
|
||||
"room_id",
|
||||
sa.String,
|
||||
sa.ForeignKey("room.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("session_id", sa.String, nullable=False),
|
||||
sa.Column("user_id", sa.String, nullable=True),
|
||||
sa.Column("user_name", sa.String, nullable=False),
|
||||
sa.Column("joined_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("left_at", sa.DateTime(timezone=True), nullable=True),
|
||||
sa.Index("idx_daily_session_meeting_left", "meeting_id", "left_at"),
|
||||
sa.Index("idx_daily_session_room", "room_id"),
|
||||
)
|
||||
|
||||
|
||||
class DailyParticipantSession(BaseModel):
|
||||
"""Daily.co participant session record.
|
||||
|
||||
Tracks when a participant joined and left a meeting. Populated from webhooks:
|
||||
- participant.joined: Creates record with left_at=None
|
||||
- participant.left: Updates record with left_at
|
||||
|
||||
ID format: {meeting_id}:{user_id}:{joined_at_ms}
|
||||
- Ensures idempotency (duplicate webhooks don't create duplicates)
|
||||
- Allows same user to rejoin (different joined_at = different session)
|
||||
|
||||
Duration is calculated as: left_at - joined_at (not stored)
|
||||
"""
|
||||
|
||||
id: NonEmptyString
|
||||
meeting_id: NonEmptyString
|
||||
room_id: NonEmptyString
|
||||
session_id: NonEmptyString # Daily.co's session_id (identifies room session)
|
||||
user_id: NonEmptyString | None = None
|
||||
user_name: str
|
||||
joined_at: datetime
|
||||
left_at: datetime | None = None
|
||||
|
||||
|
||||
class DailyParticipantSessionController:
|
||||
"""Controller for Daily.co participant session persistence."""
|
||||
|
||||
async def get_by_id(self, id: str) -> DailyParticipantSession | None:
|
||||
"""Get a session by its ID."""
|
||||
query = daily_participant_sessions.select().where(
|
||||
daily_participant_sessions.c.id == id
|
||||
)
|
||||
result = await get_database().fetch_one(query)
|
||||
return DailyParticipantSession(**result) if result else None
|
||||
|
||||
async def get_open_session(
|
||||
self, meeting_id: NonEmptyString, session_id: NonEmptyString
|
||||
) -> DailyParticipantSession | None:
|
||||
"""Get the open (not left) session for a user in a meeting."""
|
||||
query = daily_participant_sessions.select().where(
|
||||
sa.and_(
|
||||
daily_participant_sessions.c.meeting_id == meeting_id,
|
||||
daily_participant_sessions.c.session_id == session_id,
|
||||
daily_participant_sessions.c.left_at.is_(None),
|
||||
)
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
|
||||
if len(results) > 1:
|
||||
raise ValueError(
|
||||
f"Multiple open sessions for daily session {session_id} in meeting {meeting_id}: "
|
||||
f"found {len(results)} sessions"
|
||||
)
|
||||
|
||||
return DailyParticipantSession(**results[0]) if results else None
|
||||
|
||||
async def upsert_joined(self, session: DailyParticipantSession) -> None:
|
||||
"""Insert or update when participant.joined webhook arrives.
|
||||
|
||||
Idempotent: Duplicate webhooks with same ID are safely ignored.
|
||||
Out-of-order: If left webhook arrived first, preserves left_at.
|
||||
"""
|
||||
query = insert(daily_participant_sessions).values(**session.model_dump())
|
||||
query = query.on_conflict_do_update(
|
||||
index_elements=["id"],
|
||||
set_={"user_name": session.user_name},
|
||||
)
|
||||
await get_database().execute(query)
|
||||
|
||||
async def upsert_left(self, session: DailyParticipantSession) -> None:
|
||||
"""Update session when participant.left webhook arrives.
|
||||
|
||||
Finds the open session for this user in this meeting and updates left_at.
|
||||
Works around Daily.co webhook timestamp inconsistency (joined_at differs by ~4ms between webhooks).
|
||||
|
||||
Handles three cases:
|
||||
1. Normal flow: open session exists → updates left_at
|
||||
2. Out-of-order: left arrives first → creates new record with left data
|
||||
3. Duplicate: left arrives again → idempotent (DB trigger prevents left_at modification)
|
||||
"""
|
||||
if session.left_at is None:
|
||||
raise ValueError("left_at is required for upsert_left")
|
||||
|
||||
if session.left_at <= session.joined_at:
|
||||
raise ValueError(
|
||||
f"left_at ({session.left_at}) must be after joined_at ({session.joined_at})"
|
||||
)
|
||||
|
||||
# Find existing open session (works around timestamp mismatch in webhooks)
|
||||
existing = await self.get_open_session(session.meeting_id, session.session_id)
|
||||
|
||||
if existing:
|
||||
# Update existing open session
|
||||
query = (
|
||||
daily_participant_sessions.update()
|
||||
.where(daily_participant_sessions.c.id == existing.id)
|
||||
.values(left_at=session.left_at)
|
||||
)
|
||||
await get_database().execute(query)
|
||||
else:
|
||||
# Out-of-order or first webhook: insert new record
|
||||
query = insert(daily_participant_sessions).values(**session.model_dump())
|
||||
query = query.on_conflict_do_nothing(index_elements=["id"])
|
||||
await get_database().execute(query)
|
||||
|
||||
async def get_by_meeting(self, meeting_id: str) -> list[DailyParticipantSession]:
|
||||
"""Get all participant sessions for a meeting (active and ended)."""
|
||||
query = daily_participant_sessions.select().where(
|
||||
daily_participant_sessions.c.meeting_id == meeting_id
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
return [DailyParticipantSession(**result) for result in results]
|
||||
|
||||
async def get_active_by_meeting(
|
||||
self, meeting_id: str
|
||||
) -> list[DailyParticipantSession]:
|
||||
"""Get only active (not left) participant sessions for a meeting."""
|
||||
query = daily_participant_sessions.select().where(
|
||||
sa.and_(
|
||||
daily_participant_sessions.c.meeting_id == meeting_id,
|
||||
daily_participant_sessions.c.left_at.is_(None),
|
||||
)
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
return [DailyParticipantSession(**result) for result in results]
|
||||
|
||||
|
||||
daily_participant_sessions_controller = DailyParticipantSessionController()
|
||||
@@ -1,10 +1,10 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional
|
||||
|
||||
from ..schemas.platform import Platform
|
||||
from ..utils.string import NonEmptyString
|
||||
from .models import MeetingData, VideoPlatformConfig
|
||||
from .models import MeetingData, SessionData, VideoPlatformConfig
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from reflector.db.rooms import Room
|
||||
@@ -26,7 +26,8 @@ class VideoPlatformClient(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_room_sessions(self, room_name: str) -> List[Any] | None:
|
||||
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
|
||||
"""Get session history for a room."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@@ -3,10 +3,13 @@ import hmac
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import httpx
|
||||
|
||||
from reflector.db.daily_participant_sessions import (
|
||||
daily_participant_sessions_controller,
|
||||
)
|
||||
from reflector.db.rooms import Room
|
||||
from reflector.logger import logger
|
||||
from reflector.storage import get_dailyco_storage
|
||||
@@ -15,7 +18,7 @@ from ..schemas.platform import Platform
|
||||
from ..utils.daily import DailyRoomName
|
||||
from ..utils.string import NonEmptyString
|
||||
from .base import ROOM_PREFIX_SEPARATOR, VideoPlatformClient
|
||||
from .models import MeetingData, RecordingType, VideoPlatformConfig
|
||||
from .models import MeetingData, RecordingType, SessionData, VideoPlatformConfig
|
||||
|
||||
|
||||
class DailyClient(VideoPlatformClient):
|
||||
@@ -61,7 +64,8 @@ class DailyClient(VideoPlatformClient):
|
||||
},
|
||||
}
|
||||
|
||||
# Get storage config for passing to Daily API
|
||||
# Only configure recordings_bucket if recording is enabled
|
||||
if room.recording_type != self.RECORDING_NONE:
|
||||
daily_storage = get_dailyco_storage()
|
||||
assert daily_storage.bucket_name, "S3 bucket must be configured"
|
||||
data["properties"]["recordings_bucket"] = {
|
||||
@@ -70,7 +74,6 @@ class DailyClient(VideoPlatformClient):
|
||||
"assume_role_arn": daily_storage.role_credential,
|
||||
"allow_api_access": True,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{self.BASE_URL}/rooms",
|
||||
@@ -99,11 +102,49 @@ class DailyClient(VideoPlatformClient):
|
||||
extra_data=result,
|
||||
)
|
||||
|
||||
async def get_room_sessions(self, room_name: str) -> List[Any] | None:
|
||||
# no such api
|
||||
return None
|
||||
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
|
||||
"""Get room session history from database (webhook-stored sessions).
|
||||
|
||||
Daily.co doesn't provide historical session API, so we query our database
|
||||
where participant.joined/left webhooks are stored.
|
||||
"""
|
||||
from reflector.db.meetings import meetings_controller
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||
if not meeting:
|
||||
return []
|
||||
|
||||
sessions = await daily_participant_sessions_controller.get_by_meeting(
|
||||
meeting.id
|
||||
)
|
||||
|
||||
return [
|
||||
SessionData(
|
||||
session_id=s.id,
|
||||
started_at=s.joined_at,
|
||||
ended_at=s.left_at,
|
||||
)
|
||||
for s in sessions
|
||||
]
|
||||
|
||||
async def get_room_presence(self, room_name: str) -> Dict[str, Any]:
|
||||
"""Get room presence/session data for a Daily.co room.
|
||||
|
||||
Example response:
|
||||
{
|
||||
"total_count": 1,
|
||||
"data": [
|
||||
{
|
||||
"room": "w2pp2cf4kltgFACPKXmX",
|
||||
"id": "d61cd7b2-a273-42b4-89bd-be763fd562c1",
|
||||
"userId": "pbZ+ismP7dk=",
|
||||
"userName": "Moishe",
|
||||
"joinTime": "2023-01-01T20:53:19.000Z",
|
||||
"duration": 2312
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{self.BASE_URL}/rooms/{room_name}/presence",
|
||||
@@ -114,6 +155,28 @@ class DailyClient(VideoPlatformClient):
|
||||
return response.json()
|
||||
|
||||
async def get_meeting_participants(self, meeting_id: str) -> Dict[str, Any]:
|
||||
"""Get participant data for a specific Daily.co meeting.
|
||||
|
||||
Example response:
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"user_id": "4q47OTmqa/w=",
|
||||
"participant_id": "d61cd7b2-a273-42b4-89bd-be763fd562c1",
|
||||
"user_name": "Lindsey",
|
||||
"join_time": 1672786813,
|
||||
"duration": 150
|
||||
},
|
||||
{
|
||||
"user_id": "pbZ+ismP7dk=",
|
||||
"participant_id": "b3d56359-14d7-46af-ac8b-18f8c991f5f6",
|
||||
"user_name": "Moishe",
|
||||
"join_time": 1672786797,
|
||||
"duration": 165
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{self.BASE_URL}/meetings/{meeting_id}/participants",
|
||||
|
||||
@@ -1,18 +1,38 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Literal, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
RecordingType = Literal["none", "local", "cloud"]
|
||||
|
||||
|
||||
class SessionData(BaseModel):
|
||||
"""Platform-agnostic session data.
|
||||
|
||||
Represents a participant session in a meeting room, regardless of platform.
|
||||
Used to determine if a meeting is still active or has ended.
|
||||
"""
|
||||
|
||||
session_id: NonEmptyString = Field(description="Unique session identifier")
|
||||
started_at: datetime = Field(description="When session started (UTC)")
|
||||
ended_at: datetime | None = Field(
|
||||
description="When session ended (UTC), None if still active"
|
||||
)
|
||||
|
||||
|
||||
class MeetingData(BaseModel):
|
||||
platform: Platform
|
||||
meeting_id: str = Field(description="Platform-specific meeting identifier")
|
||||
room_url: str = Field(description="URL for participants to join")
|
||||
host_room_url: str = Field(description="URL for hosts (may be same as room_url)")
|
||||
room_name: str = Field(description="Human-readable room name")
|
||||
meeting_id: NonEmptyString = Field(
|
||||
description="Platform-specific meeting identifier"
|
||||
)
|
||||
room_url: NonEmptyString = Field(description="URL for participants to join")
|
||||
host_room_url: NonEmptyString = Field(
|
||||
description="URL for hosts (may be same as room_url)"
|
||||
)
|
||||
room_name: NonEmptyString = Field(description="Human-readable room name")
|
||||
extra_data: Dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
class Config:
|
||||
|
||||
@@ -4,7 +4,7 @@ import re
|
||||
import time
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
@@ -13,11 +13,8 @@ from reflector.storage import get_whereby_storage
|
||||
|
||||
from ..schemas.platform import WHEREBY_PLATFORM, Platform
|
||||
from ..utils.string import NonEmptyString
|
||||
from .base import (
|
||||
MeetingData,
|
||||
VideoPlatformClient,
|
||||
VideoPlatformConfig,
|
||||
)
|
||||
from .base import VideoPlatformClient
|
||||
from .models import MeetingData, SessionData, VideoPlatformConfig
|
||||
from .whereby_utils import whereby_room_name_prefix
|
||||
|
||||
|
||||
@@ -80,15 +77,50 @@ class WherebyClient(VideoPlatformClient):
|
||||
extra_data=result,
|
||||
)
|
||||
|
||||
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
|
||||
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
|
||||
"""Get room session history from Whereby API.
|
||||
|
||||
Whereby API returns: [{"sessionId": "...", "startedAt": "...", "endedAt": "..." | null}, ...]
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
"""
|
||||
{
|
||||
"cursor": "text",
|
||||
"results": [
|
||||
{
|
||||
"roomSessionId": "e2f29530-46ec-4cee-8b27-e565cb5bb2e9",
|
||||
"roomName": "/room-prefix-793e9ec1-c686-423d-9043-9b7a10c553fd",
|
||||
"startedAt": "2025-01-01T00:00:00.000Z",
|
||||
"endedAt": "2025-01-01T01:00:00.000Z",
|
||||
"totalParticipantMinutes": 124,
|
||||
"totalRecorderMinutes": 120,
|
||||
"totalStreamerMinutes": 120,
|
||||
"totalUniqueParticipants": 4,
|
||||
"totalUniqueRecorders": 3,
|
||||
"totalUniqueStreamers": 2
|
||||
}
|
||||
]
|
||||
}"""
|
||||
response = await client.get(
|
||||
f"{self.config.api_url}/insights/room-sessions?roomName={room_name}",
|
||||
headers=self.headers,
|
||||
timeout=self.TIMEOUT,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json().get("results", [])
|
||||
results = response.json().get("results", [])
|
||||
|
||||
return [
|
||||
SessionData(
|
||||
session_id=s["roomSessionId"],
|
||||
started_at=datetime.fromisoformat(
|
||||
s["startedAt"].replace("Z", "+00:00")
|
||||
),
|
||||
ended_at=datetime.fromisoformat(s["endedAt"].replace("Z", "+00:00"))
|
||||
if s.get("endedAt")
|
||||
else None,
|
||||
)
|
||||
for s in results
|
||||
]
|
||||
|
||||
async def delete_room(self, room_name: str) -> bool:
|
||||
return True
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Literal
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.db import get_database
|
||||
from reflector.db.daily_participant_sessions import (
|
||||
DailyParticipantSession,
|
||||
daily_participant_sessions_controller,
|
||||
)
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.logger import logger as _logger
|
||||
from reflector.settings import settings
|
||||
@@ -44,6 +50,24 @@ def _extract_room_name(event: DailyWebhookEvent) -> DailyRoomName | None:
|
||||
async def webhook(request: Request):
|
||||
"""Handle Daily webhook events.
|
||||
|
||||
Example webhook payload:
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"type": "recording.ready-to-download",
|
||||
"id": "rec-rtd-c3df927c-f738-4471-a2b7-066fa7e95a6b-1692124192",
|
||||
"payload": {
|
||||
"recording_id": "08fa0b24-9220-44c5-846c-3f116cf8e738",
|
||||
"room_name": "Xcm97xRZ08b2dePKb78g",
|
||||
"start_ts": 1692124183,
|
||||
"status": "finished",
|
||||
"max_participants": 1,
|
||||
"duration": 9,
|
||||
"share_token": "ntDCL5k98Ulq", #gitleaks:allow
|
||||
"s3_key": "api-test-1j8fizhzd30c/Xcm97xRZ08b2dePKb78g/1692124183028"
|
||||
},
|
||||
"event_ts": 1692124192
|
||||
}
|
||||
|
||||
Daily.co circuit-breaker: After 3+ failed responses (4xx/5xx), webhook
|
||||
state→FAILED, stops sending events. Reset: scripts/recreate_daily_webhook.py
|
||||
"""
|
||||
@@ -103,6 +127,32 @@ async def webhook(request: Request):
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
"""
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"type": "participant.joined",
|
||||
"id": "ptcpt-join-6497c79b-f326-4942-aef8-c36a29140ad1-1708972279961",
|
||||
"payload": {
|
||||
"room": "test",
|
||||
"user_id": "6497c79b-f326-4942-aef8-c36a29140ad1",
|
||||
"user_name": "testuser",
|
||||
"session_id": "0c0d2dda-f21d-4cf9-ab56-86bf3c407ffa",
|
||||
"joined_at": 1708972279.96,
|
||||
"will_eject_at": 1708972299.541,
|
||||
"owner": false,
|
||||
"permissions": {
|
||||
"hasPresence": true,
|
||||
"canSend": true,
|
||||
"canReceive": { "base": true },
|
||||
"canAdmin": false
|
||||
}
|
||||
},
|
||||
"event_ts": 1708972279.961
|
||||
}
|
||||
|
||||
"""
|
||||
|
||||
|
||||
async def _handle_participant_joined(event: DailyWebhookEvent):
|
||||
daily_room_name = _extract_room_name(event)
|
||||
if not daily_room_name:
|
||||
@@ -110,29 +160,111 @@ async def _handle_participant_joined(event: DailyWebhookEvent):
|
||||
return
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(daily_room_name)
|
||||
if meeting:
|
||||
if not meeting:
|
||||
logger.warning(
|
||||
"participant.joined: meeting not found", room_name=daily_room_name
|
||||
)
|
||||
return
|
||||
|
||||
payload = event.payload
|
||||
logger.warning({"payload": payload})
|
||||
joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc)
|
||||
session_id = f"{meeting.id}:{payload['session_id']}"
|
||||
|
||||
session = DailyParticipantSession(
|
||||
id=session_id,
|
||||
meeting_id=meeting.id,
|
||||
room_id=meeting.room_id,
|
||||
session_id=payload["session_id"],
|
||||
user_id=payload.get("user_id", None),
|
||||
user_name=payload["user_name"],
|
||||
joined_at=joined_at,
|
||||
left_at=None,
|
||||
)
|
||||
|
||||
# num_clients serves as a projection/cache of active session count for Daily.co
|
||||
# Both operations must succeed or fail together to maintain consistency
|
||||
async with get_database().transaction():
|
||||
await meetings_controller.increment_num_clients(meeting.id)
|
||||
await daily_participant_sessions_controller.upsert_joined(session)
|
||||
|
||||
logger.info(
|
||||
"Participant joined",
|
||||
meeting_id=meeting.id,
|
||||
room_name=daily_room_name,
|
||||
recording_type=meeting.recording_type,
|
||||
recording_trigger=meeting.recording_trigger,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"participant.joined: meeting not found", room_name=daily_room_name
|
||||
user_id=payload.get("user_id", None),
|
||||
user_name=payload.get("user_name"),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
|
||||
"""
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"type": "participant.left",
|
||||
"id": "ptcpt-left-16168c97-f973-4eae-9642-020fe3fda5db-1708972302986",
|
||||
"payload": {
|
||||
"room": "test",
|
||||
"user_id": "16168c97-f973-4eae-9642-020fe3fda5db",
|
||||
"user_name": "bipol",
|
||||
"session_id": "0c0d2dda-f21d-4cf9-ab56-86bf3c407ffa",
|
||||
"joined_at": 1708972291.567,
|
||||
"will_eject_at": null,
|
||||
"owner": false,
|
||||
"permissions": {
|
||||
"hasPresence": true,
|
||||
"canSend": true,
|
||||
"canReceive": { "base": true },
|
||||
"canAdmin": false
|
||||
},
|
||||
"duration": 11.419000148773193
|
||||
},
|
||||
"event_ts": 1708972302.986
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
async def _handle_participant_left(event: DailyWebhookEvent):
|
||||
room_name = _extract_room_name(event)
|
||||
if not room_name:
|
||||
logger.warning("participant.left: no room in payload", payload=event.payload)
|
||||
return
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||
if meeting:
|
||||
if not meeting:
|
||||
logger.warning("participant.left: meeting not found", room_name=room_name)
|
||||
return
|
||||
|
||||
payload = event.payload
|
||||
joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc)
|
||||
left_at = datetime.fromtimestamp(event.event_ts, tz=timezone.utc)
|
||||
session_id = f"{meeting.id}:{payload['session_id']}"
|
||||
|
||||
session = DailyParticipantSession(
|
||||
id=session_id,
|
||||
meeting_id=meeting.id,
|
||||
room_id=meeting.room_id,
|
||||
session_id=payload["session_id"],
|
||||
user_id=payload.get("user_id", None),
|
||||
user_name=payload["user_name"],
|
||||
joined_at=joined_at,
|
||||
left_at=left_at,
|
||||
)
|
||||
|
||||
# num_clients serves as a projection/cache of active session count for Daily.co
|
||||
# Both operations must succeed or fail together to maintain consistency
|
||||
async with get_database().transaction():
|
||||
await meetings_controller.decrement_num_clients(meeting.id)
|
||||
await daily_participant_sessions_controller.upsert_left(session)
|
||||
|
||||
logger.info(
|
||||
"Participant left",
|
||||
meeting_id=meeting.id,
|
||||
room_name=room_name,
|
||||
user_id=payload.get("user_id", None),
|
||||
duration=payload.get("duration"),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
|
||||
async def _handle_recording_started(event: DailyWebhookEvent):
|
||||
|
||||
@@ -55,9 +55,18 @@ async def transcript_process(
|
||||
recording = await recordings_controller.get_by_id(transcript.recording_id)
|
||||
if recording:
|
||||
bucket_name = recording.bucket_name
|
||||
track_keys = list(getattr(recording, "track_keys", []) or [])
|
||||
track_keys = recording.track_keys
|
||||
if track_keys is not None and len(track_keys) == 0:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="No track keys found, must be either > 0 or None",
|
||||
)
|
||||
if track_keys is not None and not bucket_name:
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Bucket name must be specified"
|
||||
)
|
||||
|
||||
if bucket_name:
|
||||
if track_keys:
|
||||
task_pipeline_multitrack_process.delay(
|
||||
transcript_id=transcript_id,
|
||||
bucket_name=bucket_name,
|
||||
|
||||
@@ -107,7 +107,7 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room):
|
||||
client = create_platform_client(get_platform(room.platform))
|
||||
|
||||
meeting_data = await client.create_meeting(
|
||||
"",
|
||||
room.name,
|
||||
end_date=end_date,
|
||||
room=room,
|
||||
)
|
||||
|
||||
@@ -335,15 +335,15 @@ async def process_meetings():
|
||||
Uses distributed locking to prevent race conditions when multiple workers
|
||||
process the same meeting simultaneously.
|
||||
"""
|
||||
logger.debug("Processing meetings")
|
||||
meetings = await meetings_controller.get_all_active()
|
||||
logger.info(f"Processing {len(meetings)} meetings")
|
||||
current_time = datetime.now(timezone.utc)
|
||||
redis_client = get_redis_client()
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
for meeting in meetings:
|
||||
logger_ = logger.bind(meeting_id=meeting.id, room_name=meeting.room_name)
|
||||
logger_.info("Processing meeting")
|
||||
lock_key = f"meeting_process_lock:{meeting.id}"
|
||||
lock = redis_client.lock(lock_key, timeout=120)
|
||||
|
||||
@@ -359,21 +359,23 @@ async def process_meetings():
|
||||
if end_date.tzinfo is None:
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
|
||||
# This API call could be slow, extend lock if needed
|
||||
client = create_platform_client(meeting.platform)
|
||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||
|
||||
try:
|
||||
# Extend lock after slow operation to ensure we still hold it
|
||||
# Extend lock after operation to ensure we still hold it
|
||||
lock.extend(120, replace_ttl=True)
|
||||
except LockError:
|
||||
logger_.warning("Lost lock for meeting, skipping")
|
||||
continue
|
||||
|
||||
has_active_sessions = room_sessions and any(
|
||||
rs["endedAt"] is None for rs in room_sessions
|
||||
s.ended_at is None for s in room_sessions
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
logger_.info(
|
||||
f"found {has_active_sessions} active sessions, had {has_had_sessions}"
|
||||
)
|
||||
|
||||
if has_active_sessions:
|
||||
logger_.debug("Meeting still has active sessions, keep it")
|
||||
|
||||
91
server/scripts/list_daily_webhooks.py
Executable file
91
server/scripts/list_daily_webhooks.py
Executable file
@@ -0,0 +1,91 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
import httpx
|
||||
|
||||
from reflector.settings import settings
|
||||
|
||||
|
||||
async def list_webhooks():
|
||||
"""
|
||||
List all Daily.co webhooks for this account.
|
||||
"""
|
||||
if not settings.DAILY_API_KEY:
|
||||
print("Error: DAILY_API_KEY not set")
|
||||
return 1
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {settings.DAILY_API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
try:
|
||||
"""
|
||||
Daily.co webhook list response format:
|
||||
[
|
||||
{
|
||||
"uuid": "0b4e4c7c-5eaf-46fe-990b-a3752f5684f5",
|
||||
"url": "{{webhook_url}}",
|
||||
"hmac": "NQrSA5z0FkJ44QPrFerW7uCc5kdNLv3l2FDEKDanL1U=",
|
||||
"basicAuth": null,
|
||||
"eventTypes": [
|
||||
"recording.started",
|
||||
"recording.ready-to-download"
|
||||
],
|
||||
"state": "ACTVIE",
|
||||
"failedCount": 0,
|
||||
"lastMomentPushed": "2023-08-15T18:29:52.000Z",
|
||||
"domainId": "{{domain_id}}",
|
||||
"createdAt": "2023-08-15T18:28:30.000Z",
|
||||
"updatedAt": "2023-08-15T18:29:52.000Z"
|
||||
}
|
||||
]
|
||||
"""
|
||||
resp = await client.get(
|
||||
"https://api.daily.co/v1/webhooks",
|
||||
headers=headers,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
webhooks = resp.json()
|
||||
|
||||
if not webhooks:
|
||||
print("No webhooks found")
|
||||
return 0
|
||||
|
||||
print(f"Found {len(webhooks)} webhook(s):\n")
|
||||
|
||||
for webhook in webhooks:
|
||||
print("=" * 80)
|
||||
print(f"UUID: {webhook['uuid']}")
|
||||
print(f"URL: {webhook['url']}")
|
||||
print(f"State: {webhook['state']}")
|
||||
print(f"Event Types: {', '.join(webhook.get('eventTypes', []))}")
|
||||
print(
|
||||
f"HMAC Secret: {'✓ Configured' if webhook.get('hmac') else '✗ Not set'}"
|
||||
)
|
||||
print()
|
||||
|
||||
print("=" * 80)
|
||||
print(
|
||||
f"\nCurrent DAILY_WEBHOOK_UUID in settings: {settings.DAILY_WEBHOOK_UUID or '(not set)'}"
|
||||
)
|
||||
|
||||
return 0
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
print(f"Error fetching webhooks: {e}")
|
||||
print(f"Response: {e.response.text}")
|
||||
return 1
|
||||
except Exception as e:
|
||||
print(f"Unexpected error: {e}")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(asyncio.run(list_webhooks()))
|
||||
@@ -3,9 +3,11 @@ from datetime import datetime
|
||||
from typing import Any, Dict, Literal, Optional
|
||||
|
||||
from reflector.db.rooms import Room
|
||||
from reflector.utils.string import NonEmptyString
|
||||
from reflector.video_platforms.base import (
|
||||
ROOM_PREFIX_SEPARATOR,
|
||||
MeetingData,
|
||||
SessionData,
|
||||
VideoPlatformClient,
|
||||
VideoPlatformConfig,
|
||||
)
|
||||
@@ -49,22 +51,18 @@ class MockPlatformClient(VideoPlatformClient):
|
||||
extra_data={"mock": True},
|
||||
)
|
||||
|
||||
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
|
||||
async def get_room_sessions(self, room_name: NonEmptyString) -> list[SessionData]:
|
||||
if room_name not in self._rooms:
|
||||
return {"error": "Room not found"}
|
||||
return []
|
||||
|
||||
room_data = self._rooms[room_name]
|
||||
return {
|
||||
"roomName": room_name,
|
||||
"sessions": [
|
||||
{
|
||||
"sessionId": room_data["id"],
|
||||
"startTime": datetime.utcnow().isoformat(),
|
||||
"participants": room_data["participants"],
|
||||
"isActive": room_data["is_active"],
|
||||
}
|
||||
],
|
||||
}
|
||||
return [
|
||||
SessionData(
|
||||
session_id=room_data["id"],
|
||||
started_at=datetime.utcnow(),
|
||||
ended_at=None if room_data["is_active"] else datetime.utcnow(),
|
||||
)
|
||||
]
|
||||
|
||||
async def delete_room(self, room_name: str) -> bool:
|
||||
if room_name in self._rooms:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
@@ -101,3 +102,113 @@ async def test_transcript_process(
|
||||
assert response.status_code == 200
|
||||
assert len(response.json()) == 1
|
||||
assert "Hello world. How are you today?" in response.json()[0]["transcript"]
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_database")
|
||||
@pytest.mark.asyncio
|
||||
async def test_whereby_recording_uses_file_pipeline(client):
|
||||
"""Test that Whereby recordings (bucket_name but no track_keys) use file pipeline"""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from reflector.db.recordings import Recording, recordings_controller
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
|
||||
# Create transcript with Whereby recording (has bucket_name, no track_keys)
|
||||
transcript = await transcripts_controller.add(
|
||||
"",
|
||||
source_kind="room",
|
||||
source_language="en",
|
||||
target_language="en",
|
||||
user_id="test-user",
|
||||
share_mode="public",
|
||||
)
|
||||
|
||||
recording = await recordings_controller.create(
|
||||
Recording(
|
||||
bucket_name="whereby-bucket",
|
||||
object_key="test-recording.mp4", # gitleaks:allow
|
||||
meeting_id="test-meeting",
|
||||
recorded_at=datetime.now(timezone.utc),
|
||||
track_keys=None, # Whereby recordings have no track_keys
|
||||
)
|
||||
)
|
||||
|
||||
await transcripts_controller.update(
|
||||
transcript, {"recording_id": recording.id, "status": "uploaded"}
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"reflector.views.transcripts_process.task_pipeline_file_process"
|
||||
) as mock_file_pipeline,
|
||||
patch(
|
||||
"reflector.views.transcripts_process.task_pipeline_multitrack_process"
|
||||
) as mock_multitrack_pipeline,
|
||||
):
|
||||
response = await client.post(f"/transcripts/{transcript.id}/process")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "ok"
|
||||
|
||||
# Whereby recordings should use file pipeline
|
||||
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
|
||||
mock_multitrack_pipeline.delay.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_database")
|
||||
@pytest.mark.asyncio
|
||||
async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
||||
"""Test that Daily.co recordings (bucket_name + track_keys) use multitrack pipeline"""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from reflector.db.recordings import Recording, recordings_controller
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
|
||||
# Create transcript with Daily.co multitrack recording
|
||||
transcript = await transcripts_controller.add(
|
||||
"",
|
||||
source_kind="room",
|
||||
source_language="en",
|
||||
target_language="en",
|
||||
user_id="test-user",
|
||||
share_mode="public",
|
||||
)
|
||||
|
||||
track_keys = [
|
||||
"recordings/test-room/track1.webm",
|
||||
"recordings/test-room/track2.webm",
|
||||
]
|
||||
recording = await recordings_controller.create(
|
||||
Recording(
|
||||
bucket_name="daily-bucket",
|
||||
object_key="recordings/test-room",
|
||||
meeting_id="test-meeting",
|
||||
track_keys=track_keys,
|
||||
recorded_at=datetime.now(timezone.utc),
|
||||
)
|
||||
)
|
||||
|
||||
await transcripts_controller.update(
|
||||
transcript, {"recording_id": recording.id, "status": "uploaded"}
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"reflector.views.transcripts_process.task_pipeline_file_process"
|
||||
) as mock_file_pipeline,
|
||||
patch(
|
||||
"reflector.views.transcripts_process.task_pipeline_multitrack_process"
|
||||
) as mock_multitrack_pipeline,
|
||||
):
|
||||
response = await client.post(f"/transcripts/{transcript.id}/process")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "ok"
|
||||
|
||||
# Daily.co multitrack recordings should use multitrack pipeline
|
||||
mock_multitrack_pipeline.delay.assert_called_once_with(
|
||||
transcript_id=transcript.id,
|
||||
bucket_name="daily-bucket",
|
||||
track_keys=track_keys,
|
||||
)
|
||||
mock_file_pipeline.delay.assert_not_called()
|
||||
|
||||
@@ -10,7 +10,15 @@ import FinalSummary from "./finalSummary";
|
||||
import TranscriptTitle from "../transcriptTitle";
|
||||
import Player from "../player";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { Box, Flex, Grid, GridItem, Skeleton, Text } from "@chakra-ui/react";
|
||||
import {
|
||||
Box,
|
||||
Flex,
|
||||
Grid,
|
||||
GridItem,
|
||||
Skeleton,
|
||||
Text,
|
||||
Spinner,
|
||||
} from "@chakra-ui/react";
|
||||
import { useTranscriptGet } from "../../../lib/apiHooks";
|
||||
import { TranscriptStatus } from "../../../lib/transcript";
|
||||
|
||||
@@ -28,6 +36,7 @@ export default function TranscriptDetails(details: TranscriptDetails) {
|
||||
"idle",
|
||||
"recording",
|
||||
"processing",
|
||||
"uploaded",
|
||||
] satisfies TranscriptStatus[] as TranscriptStatus[];
|
||||
|
||||
const transcript = useTranscriptGet(transcriptId);
|
||||
@@ -45,15 +54,55 @@ export default function TranscriptDetails(details: TranscriptDetails) {
|
||||
useState<HTMLDivElement | null>(null);
|
||||
|
||||
useEffect(() => {
|
||||
if (waiting) {
|
||||
const newUrl = "/transcripts/" + params.transcriptId + "/record";
|
||||
if (!waiting || !transcript.data) return;
|
||||
|
||||
const status = transcript.data.status;
|
||||
let newUrl: string | null = null;
|
||||
|
||||
if (status === "processing" || status === "uploaded") {
|
||||
newUrl = `/transcripts/${params.transcriptId}/processing`;
|
||||
} else if (status === "recording") {
|
||||
newUrl = `/transcripts/${params.transcriptId}/record`;
|
||||
} else if (status === "idle") {
|
||||
newUrl =
|
||||
transcript.data.source_kind === "file"
|
||||
? `/transcripts/${params.transcriptId}/upload`
|
||||
: `/transcripts/${params.transcriptId}/record`;
|
||||
}
|
||||
|
||||
if (newUrl) {
|
||||
// Shallow redirection does not work on NextJS 13
|
||||
// https://github.com/vercel/next.js/discussions/48110
|
||||
// https://github.com/vercel/next.js/discussions/49540
|
||||
router.replace(newUrl);
|
||||
// history.replaceState({}, "", newUrl);
|
||||
}
|
||||
}, [waiting]);
|
||||
}, [waiting, transcript.data?.status, transcript.data?.source_kind]);
|
||||
|
||||
if (waiting) {
|
||||
return (
|
||||
<Box>
|
||||
<Box
|
||||
w="full"
|
||||
background="gray.bg"
|
||||
border={"2px solid"}
|
||||
borderColor={"gray.bg"}
|
||||
borderRadius={8}
|
||||
p={6}
|
||||
minH="100%"
|
||||
display="flex"
|
||||
alignItems="center"
|
||||
justifyContent="center"
|
||||
>
|
||||
<Flex direction="column" align="center" gap={3}>
|
||||
<Spinner size="xl" color="blue.500" />
|
||||
<Text color="gray.600" textAlign="center">
|
||||
Loading transcript...
|
||||
</Text>
|
||||
</Flex>
|
||||
</Box>
|
||||
</Box>
|
||||
);
|
||||
}
|
||||
|
||||
if (transcript.error || topics?.error) {
|
||||
return (
|
||||
|
||||
97
www/app/(app)/transcripts/[transcriptId]/processing/page.tsx
Normal file
97
www/app/(app)/transcripts/[transcriptId]/processing/page.tsx
Normal file
@@ -0,0 +1,97 @@
|
||||
"use client";
|
||||
import { useEffect, use } from "react";
|
||||
import {
|
||||
Heading,
|
||||
Text,
|
||||
VStack,
|
||||
Spinner,
|
||||
Button,
|
||||
Center,
|
||||
} from "@chakra-ui/react";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||
|
||||
type TranscriptProcessing = {
|
||||
params: Promise<{
|
||||
transcriptId: string;
|
||||
}>;
|
||||
};
|
||||
|
||||
export default function TranscriptProcessing(details: TranscriptProcessing) {
|
||||
const params = use(details.params);
|
||||
const transcriptId = params.transcriptId;
|
||||
const router = useRouter();
|
||||
|
||||
const transcript = useTranscriptGet(transcriptId);
|
||||
|
||||
useEffect(() => {
|
||||
const status = transcript.data?.status;
|
||||
if (!status) return;
|
||||
|
||||
if (status === "ended" || status === "error") {
|
||||
router.replace(`/transcripts/${transcriptId}`);
|
||||
} else if (status === "recording") {
|
||||
router.replace(`/transcripts/${transcriptId}/record`);
|
||||
} else if (status === "idle") {
|
||||
const dest =
|
||||
transcript.data?.source_kind === "file"
|
||||
? `/transcripts/${transcriptId}/upload`
|
||||
: `/transcripts/${transcriptId}/record`;
|
||||
router.replace(dest);
|
||||
}
|
||||
}, [
|
||||
transcript.data?.status,
|
||||
transcript.data?.source_kind,
|
||||
router,
|
||||
transcriptId,
|
||||
]);
|
||||
|
||||
if (transcript.isLoading) {
|
||||
return (
|
||||
<VStack align="center" py={8}>
|
||||
<Heading size="lg">Loading transcript...</Heading>
|
||||
</VStack>
|
||||
);
|
||||
}
|
||||
|
||||
if (transcript.error) {
|
||||
return (
|
||||
<VStack align="center" py={8}>
|
||||
<Heading size="lg">Transcript not found</Heading>
|
||||
<Text>We couldn't load this transcript.</Text>
|
||||
</VStack>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<>
|
||||
<VStack
|
||||
align={"left"}
|
||||
minH="100vh"
|
||||
pt={4}
|
||||
mx="auto"
|
||||
w={{ base: "full", md: "container.xl" }}
|
||||
>
|
||||
<Center h={"full"} w="full">
|
||||
<VStack gap={10} bg="gray.100" p={10} borderRadius="md" maxW="500px">
|
||||
<Spinner size="xl" color="blue.500" />
|
||||
<Heading size={"md"} textAlign="center">
|
||||
Processing recording
|
||||
</Heading>
|
||||
<Text color="gray.600" textAlign="center">
|
||||
You can safely return to the library while your recording is being
|
||||
processed.
|
||||
</Text>
|
||||
<Button
|
||||
onClick={() => {
|
||||
router.push("/browse");
|
||||
}}
|
||||
>
|
||||
Browse
|
||||
</Button>
|
||||
</VStack>
|
||||
</Center>
|
||||
</VStack>
|
||||
</>
|
||||
);
|
||||
}
|
||||
@@ -4,7 +4,7 @@ import { useWebSockets } from "../../useWebSockets";
|
||||
import { lockWakeState, releaseWakeState } from "../../../../lib/wakeLock";
|
||||
import { useRouter } from "next/navigation";
|
||||
import useMp3 from "../../useMp3";
|
||||
import { Center, VStack, Text, Heading, Button } from "@chakra-ui/react";
|
||||
import { Center, VStack, Text, Heading } from "@chakra-ui/react";
|
||||
import FileUploadButton from "../../fileUploadButton";
|
||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||
|
||||
@@ -53,6 +53,12 @@ const TranscriptUpload = (details: TranscriptUpload) => {
|
||||
|
||||
const newUrl = "/transcripts/" + params.transcriptId;
|
||||
router.replace(newUrl);
|
||||
} else if (
|
||||
newStatus &&
|
||||
(newStatus == "uploaded" || newStatus == "processing")
|
||||
) {
|
||||
// After upload finishes (or if already processing), redirect to the unified processing page
|
||||
router.replace(`/transcripts/${params.transcriptId}/processing`);
|
||||
}
|
||||
}, [webSockets.status?.value, transcript.data?.status]);
|
||||
|
||||
@@ -71,7 +77,7 @@ const TranscriptUpload = (details: TranscriptUpload) => {
|
||||
<>
|
||||
<VStack
|
||||
align={"left"}
|
||||
h="full"
|
||||
minH="100vh"
|
||||
pt={4}
|
||||
mx="auto"
|
||||
w={{ base: "full", md: "container.xl" }}
|
||||
@@ -79,34 +85,16 @@ const TranscriptUpload = (details: TranscriptUpload) => {
|
||||
<Heading size={"lg"}>Upload meeting</Heading>
|
||||
<Center h={"full"} w="full">
|
||||
<VStack gap={10} bg="gray.100" p={10} borderRadius="md" maxW="500px">
|
||||
{status && status == "idle" && (
|
||||
<>
|
||||
<Text>
|
||||
Please select the file, supported formats: .mp3, m4a, .wav,
|
||||
.mp4, .mov or .webm
|
||||
Please select the file, supported formats: .mp3, m4a, .wav, .mp4,
|
||||
.mov or .webm
|
||||
</Text>
|
||||
<FileUploadButton transcriptId={params.transcriptId} />
|
||||
</>
|
||||
)}
|
||||
{status && status == "uploaded" && (
|
||||
<Text>File is uploaded, processing...</Text>
|
||||
)}
|
||||
{(status == "recording" || status == "processing") && (
|
||||
<>
|
||||
<Heading size={"lg"}>Processing your recording...</Heading>
|
||||
<Text>
|
||||
You can safely return to the library while your file is being
|
||||
processed.
|
||||
</Text>
|
||||
<Button
|
||||
onClick={() => {
|
||||
router.push("/browse");
|
||||
}}
|
||||
>
|
||||
Browse
|
||||
</Button>
|
||||
</>
|
||||
)}
|
||||
<FileUploadButton
|
||||
transcriptId={params.transcriptId}
|
||||
onUploadComplete={() =>
|
||||
router.replace(`/transcripts/${params.transcriptId}/processing`)
|
||||
}
|
||||
/>
|
||||
</VStack>
|
||||
</Center>
|
||||
</VStack>
|
||||
|
||||
60
www/app/(app)/transcripts/buildTranscriptWithTopics.ts
Normal file
60
www/app/(app)/transcripts/buildTranscriptWithTopics.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import type { components } from "../../reflector-api";
|
||||
import { formatTime } from "../../lib/time";
|
||||
|
||||
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
|
||||
type Participant = components["schemas"]["Participant"];
|
||||
|
||||
function getSpeakerName(
|
||||
speakerNumber: number,
|
||||
participants?: Participant[] | null,
|
||||
): string {
|
||||
const name = participants?.find((p) => p.speaker === speakerNumber)?.name;
|
||||
return name && name.trim().length > 0 ? name : `Speaker ${speakerNumber}`;
|
||||
}
|
||||
|
||||
export function buildTranscriptWithTopics(
|
||||
topics: GetTranscriptTopic[],
|
||||
participants?: Participant[] | null,
|
||||
transcriptTitle?: string | null,
|
||||
): string {
|
||||
const blocks: string[] = [];
|
||||
|
||||
if (transcriptTitle && transcriptTitle.trim()) {
|
||||
blocks.push(`# ${transcriptTitle.trim()}`);
|
||||
blocks.push("");
|
||||
}
|
||||
|
||||
for (const topic of topics) {
|
||||
// Topic header
|
||||
const topicTime = formatTime(Math.floor(topic.timestamp || 0));
|
||||
const title = topic.title?.trim() || "Untitled Topic";
|
||||
blocks.push(`## ${title} [${topicTime}]`);
|
||||
|
||||
if (topic.segments && topic.segments.length > 0) {
|
||||
for (const seg of topic.segments) {
|
||||
const ts = formatTime(Math.floor(seg.start || 0));
|
||||
const speaker = getSpeakerName(seg.speaker as number, participants);
|
||||
const text = (seg.text || "").replace(/\s+/g, " ").trim();
|
||||
if (text) {
|
||||
blocks.push(`[${ts}] ${speaker}: ${text}`);
|
||||
}
|
||||
}
|
||||
} else if (topic.transcript) {
|
||||
// Fallback: plain transcript when segments are not present
|
||||
const text = topic.transcript.replace(/\s+/g, " ").trim();
|
||||
if (text) {
|
||||
blocks.push(text);
|
||||
}
|
||||
}
|
||||
|
||||
// Blank line between topics
|
||||
blocks.push("");
|
||||
}
|
||||
|
||||
// Trim trailing blank line
|
||||
while (blocks.length > 0 && blocks[blocks.length - 1] === "") {
|
||||
blocks.pop();
|
||||
}
|
||||
|
||||
return blocks.join("\n");
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import { useError } from "../../(errors)/errorContext";
|
||||
|
||||
type FileUploadButton = {
|
||||
transcriptId: string;
|
||||
onUploadComplete?: () => void;
|
||||
};
|
||||
|
||||
export default function FileUploadButton(props: FileUploadButton) {
|
||||
@@ -31,6 +32,7 @@ export default function FileUploadButton(props: FileUploadButton) {
|
||||
const uploadNextChunk = async () => {
|
||||
if (chunkNumber == totalChunks) {
|
||||
setProgress(0);
|
||||
props.onUploadComplete?.();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ import type { components } from "../../reflector-api";
|
||||
type GetTranscript = components["schemas"]["GetTranscript"];
|
||||
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
|
||||
import { Button, BoxProps, Box } from "@chakra-ui/react";
|
||||
import { buildTranscriptWithTopics } from "./buildTranscriptWithTopics";
|
||||
import { useTranscriptParticipants } from "../../lib/apiHooks";
|
||||
|
||||
type ShareCopyProps = {
|
||||
finalSummaryElement: HTMLDivElement | null;
|
||||
@@ -18,6 +20,7 @@ export default function ShareCopy({
|
||||
}: ShareCopyProps & BoxProps) {
|
||||
const [isCopiedSummary, setIsCopiedSummary] = useState(false);
|
||||
const [isCopiedTranscript, setIsCopiedTranscript] = useState(false);
|
||||
const participantsQuery = useTranscriptParticipants(transcript?.id || null);
|
||||
|
||||
const onCopySummaryClick = () => {
|
||||
const text_to_copy = finalSummaryElement?.innerText;
|
||||
@@ -32,12 +35,12 @@ export default function ShareCopy({
|
||||
};
|
||||
|
||||
const onCopyTranscriptClick = () => {
|
||||
let text_to_copy =
|
||||
topics
|
||||
?.map((topic) => topic.transcript)
|
||||
.join("\n\n")
|
||||
.replace(/ +/g, " ")
|
||||
.trim() || "";
|
||||
const text_to_copy =
|
||||
buildTranscriptWithTopics(
|
||||
topics || [],
|
||||
participantsQuery?.data || null,
|
||||
transcript?.title || null,
|
||||
) || "";
|
||||
|
||||
text_to_copy &&
|
||||
navigator.clipboard.writeText(text_to_copy).then(() => {
|
||||
|
||||
@@ -4,10 +4,15 @@ import type { components } from "../../reflector-api";
|
||||
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
|
||||
type GetTranscript = components["schemas"]["GetTranscript"];
|
||||
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
|
||||
import { useTranscriptUpdate } from "../../lib/apiHooks";
|
||||
import {
|
||||
useTranscriptUpdate,
|
||||
useTranscriptParticipants,
|
||||
} from "../../lib/apiHooks";
|
||||
import { Heading, IconButton, Input, Flex, Spacer } from "@chakra-ui/react";
|
||||
import { LuPen } from "react-icons/lu";
|
||||
import { LuPen, LuCopy, LuCheck } from "react-icons/lu";
|
||||
import ShareAndPrivacy from "./shareAndPrivacy";
|
||||
import { buildTranscriptWithTopics } from "./buildTranscriptWithTopics";
|
||||
import { toaster } from "../../components/ui/toaster";
|
||||
|
||||
type TranscriptTitle = {
|
||||
title: string;
|
||||
@@ -25,6 +30,9 @@ const TranscriptTitle = (props: TranscriptTitle) => {
|
||||
const [preEditTitle, setPreEditTitle] = useState(props.title);
|
||||
const [isEditing, setIsEditing] = useState(false);
|
||||
const updateTranscriptMutation = useTranscriptUpdate();
|
||||
const participantsQuery = useTranscriptParticipants(
|
||||
props.transcript?.id || null,
|
||||
);
|
||||
|
||||
const updateTitle = async (newTitle: string, transcriptId: string) => {
|
||||
try {
|
||||
@@ -118,11 +126,57 @@ const TranscriptTitle = (props: TranscriptTitle) => {
|
||||
<LuPen />
|
||||
</IconButton>
|
||||
{props.transcript && props.topics && (
|
||||
<>
|
||||
<IconButton
|
||||
aria-label="Copy Transcript"
|
||||
size="sm"
|
||||
variant="subtle"
|
||||
onClick={() => {
|
||||
const text = buildTranscriptWithTopics(
|
||||
props.topics || [],
|
||||
participantsQuery?.data || null,
|
||||
props.transcript?.title || null,
|
||||
);
|
||||
if (!text) return;
|
||||
navigator.clipboard
|
||||
.writeText(text)
|
||||
.then(() => {
|
||||
toaster
|
||||
.create({
|
||||
placement: "top",
|
||||
duration: 2500,
|
||||
render: () => (
|
||||
<div className="chakra-ui-light">
|
||||
<div
|
||||
style={{
|
||||
background: "#38A169",
|
||||
color: "white",
|
||||
padding: "8px 12px",
|
||||
borderRadius: 6,
|
||||
display: "flex",
|
||||
alignItems: "center",
|
||||
gap: 8,
|
||||
boxShadow: "rgba(0,0,0,0.25) 0px 4px 12px",
|
||||
}}
|
||||
>
|
||||
<LuCheck /> Transcript copied
|
||||
</div>
|
||||
</div>
|
||||
),
|
||||
})
|
||||
.then(() => {});
|
||||
})
|
||||
.catch(() => {});
|
||||
}}
|
||||
>
|
||||
<LuCopy />
|
||||
</IconButton>
|
||||
<ShareAndPrivacy
|
||||
finalSummaryElement={props.finalSummaryElement}
|
||||
transcript={props.transcript}
|
||||
topics={props.topics}
|
||||
/>
|
||||
</>
|
||||
)}
|
||||
</Flex>
|
||||
)}
|
||||
|
||||
Reference in New Issue
Block a user