diff --git a/.gitignore b/.gitignore index e705c6b7..b9afc634 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,8 @@ server/exportdanswer .vercel .env*.local dump.rdb +.yarn +ngrok.log +.claude/settings.local.json +restart-dev.sh +*.log \ No newline at end of file diff --git a/server/.env_template b/server/.env_template index 8252dfdd..2c79d80d 100644 --- a/server/.env_template +++ b/server/.env_template @@ -18,3 +18,4 @@ DIARIZATION_URL=https://monadical-sas--reflector-diarizer-web.modal.run BASE_URL=https://xxxxx.ngrok.app DIARIZATION_ENABLED=false +SQS_POLLING_TIMEOUT_SECONDS=60 diff --git a/server/README.md b/server/README.md index e69de29b..74675085 100644 --- a/server/README.md +++ b/server/README.md @@ -0,0 +1,22 @@ +## AWS S3/SQS usage clarification + +Whereby.com uploads recordings directly to our S3 bucket when meetings end. + +SQS Queue (AWS_PROCESS_RECORDING_QUEUE_URL) + +Filled by: AWS S3 Event Notifications + +The S3 bucket is configured to send notifications to our SQS queue when new objects are created. This is standard AWS infrastructure - not in our codebase. + +AWS S3 → SQS Event Configuration: +- Event Type: s3:ObjectCreated:* +- Filter: *.mp4 files +- Destination: Our SQS queue + +Our System's Role + +Polls SQS every 60 seconds via /server/reflector/worker/process.py:24-62: + +# Every 60 seconds, check for new recordings +sqs = boto3.client("sqs", ...) +response = sqs.receive_message(QueueUrl=queue_url, ...) diff --git a/server/migrations/versions/20250617140003_add_meeting_consent_table.py b/server/migrations/versions/20250617140003_add_meeting_consent_table.py new file mode 100644 index 00000000..ae85219c --- /dev/null +++ b/server/migrations/versions/20250617140003_add_meeting_consent_table.py @@ -0,0 +1,34 @@ +"""add meeting consent table + +Revision ID: 20250617140003 +Revises: f819277e5169 +Create Date: 2025-06-17 14:00:03.000000 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "20250617140003" +down_revision: Union[str, None] = "d3ff3a39297f" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + 'meeting_consent', + sa.Column('id', sa.String(), nullable=False), + sa.Column('meeting_id', sa.String(), nullable=False), + sa.Column('user_id', sa.String(), nullable=True), + sa.Column('consent_given', sa.Boolean(), nullable=False), + sa.Column('consent_timestamp', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint(['meeting_id'], ['meeting.id']), + ) + + +def downgrade() -> None: + op.drop_table('meeting_consent') \ No newline at end of file diff --git a/server/migrations/versions/20250618140000_add_audio_deleted_field_to_transcript.py b/server/migrations/versions/20250618140000_add_audio_deleted_field_to_transcript.py new file mode 100644 index 00000000..928e8183 --- /dev/null +++ b/server/migrations/versions/20250618140000_add_audio_deleted_field_to_transcript.py @@ -0,0 +1,25 @@ +"""add audio_deleted field to transcript + +Revision ID: 20250618140000 +Revises: 20250617140003 +Create Date: 2025-06-18 14:00:00.000000 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "20250618140000" +down_revision: Union[str, None] = "20250617140003" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("transcript", sa.Column("audio_deleted", sa.Boolean(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("transcript", "audio_deleted") \ No newline at end of file diff --git a/server/migrations/versions/b3df9681cae9_add_source_and_target_language.py b/server/migrations/versions/b3df9681cae9_add_source_and_target_language.py deleted file mode 100644 index ed8a85b2..00000000 --- a/server/migrations/versions/b3df9681cae9_add_source_and_target_language.py +++ /dev/null @@ -1,32 +0,0 @@ -"""add source and target language - -Revision ID: b3df9681cae9 -Revises: 543ed284d69a -Create Date: 2023-08-29 10:55:37.690469 - -""" -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision: str = 'b3df9681cae9' -down_revision: Union[str, None] = '543ed284d69a' -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.add_column('transcript', sa.Column('source_language', sa.String(), nullable=True)) - op.add_column('transcript', sa.Column('target_language', sa.String(), nullable=True)) - # ### end Alembic commands ### - - -def downgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.drop_column('transcript', 'target_language') - op.drop_column('transcript', 'source_language') - # ### end Alembic commands ### diff --git a/server/reflector/app.py b/server/reflector/app.py index 079a5efe..d10fc744 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -11,6 +11,7 @@ from reflector.events import subscribers_shutdown, subscribers_startup from reflector.logger import logger from reflector.metrics import metrics_init from reflector.settings import settings +from reflector.views.meetings import router as meetings_router from reflector.views.rooms import router as rooms_router from reflector.views.rtc_offer import router as rtc_offer_router from reflector.views.transcripts import router as transcripts_router @@ -71,6 +72,7 @@ metrics_init(app, instrumentator) # register views app.include_router(rtc_offer_router) +app.include_router(meetings_router, prefix="/v1") app.include_router(rooms_router, prefix="/v1") app.include_router(transcripts_router, prefix="/v1") app.include_router(transcripts_audio_router, prefix="/v1") diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 6852d5b0..e6830460 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -3,9 +3,10 @@ from typing import Literal import sqlalchemy as sa from fastapi import HTTPException -from pydantic import BaseModel +from pydantic import BaseModel, Field from reflector.db import database, metadata from reflector.db.rooms import Room +from reflector.utils import generate_uuid4 meetings = sa.Table( "meeting", @@ -41,6 +42,24 @@ meetings = sa.Table( ), ) +meeting_consent = sa.Table( + "meeting_consent", + metadata, + sa.Column("id", sa.String, primary_key=True), + sa.Column("meeting_id", sa.String, sa.ForeignKey("meeting.id")), + sa.Column("user_id", sa.String, nullable=True), + sa.Column("consent_given", sa.Boolean), + sa.Column("consent_timestamp", sa.DateTime), +) + + +class MeetingConsent(BaseModel): + id: str = Field(default_factory=generate_uuid4) + meeting_id: str + user_id: str | None = None + consent_given: bool + consent_timestamp: datetime + class Meeting(BaseModel): id: str @@ -116,7 +135,7 @@ class MeetingController: async def get_active(self, room: Room, current_time: datetime) -> Meeting: """ - Get latest meeting for a room. + Get latest active meeting for a room. """ end_date = getattr(meetings.c, "end_date") query = ( @@ -125,6 +144,7 @@ class MeetingController: sa.and_( meetings.c.room_id == room.id, meetings.c.end_date > current_time, + meetings.c.is_active, ) ) .order_by(end_date.desc()) @@ -167,4 +187,63 @@ class MeetingController: await database.execute(query) +class MeetingConsentController: + async def get_by_meeting_id(self, meeting_id: str) -> list[MeetingConsent]: + query = meeting_consent.select().where( + meeting_consent.c.meeting_id == meeting_id + ) + results = await database.fetch_all(query) + return [MeetingConsent(**result) for result in results] + + async def get_by_meeting_and_user( + self, meeting_id: str, user_id: str + ) -> MeetingConsent | None: + """Get existing consent for a specific user and meeting""" + query = meeting_consent.select().where( + meeting_consent.c.meeting_id == meeting_id, + meeting_consent.c.user_id == user_id, + ) + result = await database.fetch_one(query) + if result is None: + return None + return MeetingConsent(**result) if result else None + + async def upsert(self, consent: MeetingConsent) -> MeetingConsent: + """Create new consent or update existing one for authenticated users""" + if consent.user_id: + # For authenticated users, check if consent already exists + # not transactional but we're ok with that; the consents ain't deleted anyways + existing = await self.get_by_meeting_and_user( + consent.meeting_id, consent.user_id + ) + if existing: + query = ( + meeting_consent.update() + .where(meeting_consent.c.id == existing.id) + .values( + consent_given=consent.consent_given, + consent_timestamp=consent.consent_timestamp, + ) + ) + await database.execute(query) + + existing.consent_given = consent.consent_given + existing.consent_timestamp = consent.consent_timestamp + return existing + + query = meeting_consent.insert().values(**consent.model_dump()) + await database.execute(query) + return consent + + async def has_any_denial(self, meeting_id: str) -> bool: + """Check if any participant denied consent for this meeting""" + query = meeting_consent.select().where( + meeting_consent.c.meeting_id == meeting_id, + meeting_consent.c.consent_given.is_(False), + ) + result = await database.fetch_one(query) + return result is not None + + meetings_controller = MeetingController() +meeting_consent_controller = MeetingConsentController() diff --git a/server/reflector/db/recordings.py b/server/reflector/db/recordings.py index 254e612a..31670609 100644 --- a/server/reflector/db/recordings.py +++ b/server/reflector/db/recordings.py @@ -1,10 +1,10 @@ from datetime import datetime from typing import Literal -from uuid import uuid4 import sqlalchemy as sa from pydantic import BaseModel, Field from reflector.db import database, metadata +from reflector.utils import generate_uuid4 recordings = sa.Table( "recording", @@ -23,10 +23,6 @@ recordings = sa.Table( ) -def generate_uuid4() -> str: - return str(uuid4()) - - class Recording(BaseModel): id: str = Field(default_factory=generate_uuid4) bucket_name: str diff --git a/server/reflector/db/rooms.py b/server/reflector/db/rooms.py index 16a8fb7a..27837eb1 100644 --- a/server/reflector/db/rooms.py +++ b/server/reflector/db/rooms.py @@ -6,7 +6,7 @@ import sqlalchemy from fastapi import HTTPException from pydantic import BaseModel, Field from reflector.db import database, metadata -from reflector.db.transcripts import generate_uuid4 +from reflector.utils import generate_uuid4 from sqlalchemy.sql import false, or_ rooms = sqlalchemy.Table( diff --git a/server/reflector/db/transcripts.py b/server/reflector/db/transcripts.py index b9ffe0d2..67f66aac 100644 --- a/server/reflector/db/transcripts.py +++ b/server/reflector/db/transcripts.py @@ -6,7 +6,7 @@ from contextlib import asynccontextmanager from datetime import datetime from pathlib import Path from typing import Any, Literal -from uuid import uuid4 +from reflector.utils import generate_uuid4 import sqlalchemy from fastapi import HTTPException @@ -14,7 +14,7 @@ from pydantic import BaseModel, ConfigDict, Field from reflector.db import database, metadata from reflector.processors.types import Word as ProcessorWord from reflector.settings import settings -from reflector.storage import Storage +from reflector.storage import get_transcripts_storage from sqlalchemy import Enum from sqlalchemy.sql import false, or_ @@ -70,25 +70,18 @@ transcripts = sqlalchemy.Table( Enum(SourceKind, values_callable=lambda obj: [e.value for e in obj]), nullable=False, ), + # indicative field: whether associated audio is deleted + # the main "audio deleted" is the presence of the audio itself / consents not-given + # same field could've been in recording/meeting, and it's maybe even ok to dupe it at need + sqlalchemy.Column("audio_deleted", sqlalchemy.Boolean, nullable=True), ) -def generate_uuid4() -> str: - return str(uuid4()) - - def generate_transcript_name() -> str: now = datetime.utcnow() return f"Transcript {now.strftime('%Y-%m-%d %H:%M:%S')}" -def get_storage() -> Storage: - return Storage.get_instance( - name=settings.TRANSCRIPT_STORAGE_BACKEND, - settings_prefix="TRANSCRIPT_STORAGE_", - ) - - class AudioWaveform(BaseModel): data: list[float] @@ -169,6 +162,7 @@ class Transcript(BaseModel): recording_id: str | None = None zulip_message_id: int | None = None source_kind: SourceKind + audio_deleted: bool | None = None def add_event(self, event: str, data: BaseModel) -> TranscriptEvent: ev = TranscriptEvent(event=event, data=data.model_dump()) @@ -257,7 +251,7 @@ class Transcript(BaseModel): raise Exception(f"Unknown audio location {self.audio_location}") async def _generate_storage_audio_link(self) -> str: - return await get_storage().get_file_url(self.storage_audio_path) + return await get_transcripts_storage().get_file_url(self.storage_audio_path) def _generate_local_audio_link(self) -> str: # we need to create an url to be used for diarization @@ -542,7 +536,7 @@ class TranscriptController: topic: TranscriptTopic, ) -> TranscriptEvent: """ - Append an event to a transcript + Upsert topics to a transcript """ transcript.upsert_topic(topic) await self.update( @@ -556,9 +550,19 @@ class TranscriptController: Move mp3 file to storage """ + if transcript.audio_deleted: + raise FileNotFoundError( + f"Invalid state of transcript {transcript.id}: audio_deleted mark is set true" + ) + if transcript.audio_location == "local": # store the audio on external storage if it's not already there - await get_storage().put_file( + if not transcript.audio_mp3_filename.exists(): + raise FileNotFoundError( + f"Audio file not found: {transcript.audio_mp3_filename}" + ) + + await get_transcripts_storage().put_file( transcript.storage_audio_path, transcript.audio_mp3_filename.read_bytes(), ) @@ -574,7 +578,7 @@ class TranscriptController: Download audio from storage """ transcript.audio_mp3_filename.write_bytes( - await get_storage().get_file( + await get_transcripts_storage().get_file( transcript.storage_audio_path, ) ) diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index b42bcbad..aca28586 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -59,6 +59,12 @@ from reflector.zulip import ( send_message_to_zulip, update_zulip_message, ) + +from reflector.db.meetings import meeting_consent_controller +from reflector.storage import get_transcripts_storage + +import boto3 + from structlog import BoundLogger as Logger @@ -470,6 +476,7 @@ class PipelineMainWaveform(PipelineMainFromTopics): @get_transcript async def pipeline_remove_upload(transcript: Transcript, logger: Logger): + # for future changes: note that there's also a consent process happens, beforehand and users may not consent with keeping files. currently, we delete regardless, so it's no need for that logger.info("Starting remove upload") uploads = transcript.data_path.glob("upload.*") for upload in uploads: @@ -520,6 +527,10 @@ async def pipeline_upload_mp3(transcript: Transcript, logger: Logger): logger.info("No storage backend configured, skipping mp3 upload") return + if transcript.audio_deleted: + logger.info("Skipping mp3 upload - audio marked as deleted") + return + logger.info("Starting upload mp3") # If the audio mp3 is not available, just skip @@ -558,6 +569,74 @@ async def pipeline_summaries(transcript: Transcript, logger: Logger): logger.info("Summaries done") +@get_transcript +async def cleanup_consent(transcript: Transcript, logger: Logger): + logger.info("Starting consent cleanup") + + consent_denied = False + recording = None + try: + if transcript.recording_id: + recording = await recordings_controller.get_by_id(transcript.recording_id) + if recording and recording.meeting_id: + meeting = await meetings_controller.get_by_id(recording.meeting_id) + if meeting: + consent_denied = await meeting_consent_controller.has_any_denial( + meeting.id + ) + except Exception as e: + logger.error(f"Failed to get fetch consent: {e}") + consent_denied = True + + if not consent_denied: + logger.info("Consent approved, keeping all files") + return + + logger.info("Consent denied, cleaning up all related audio files") + + if recording and recording.bucket_name and recording.object_key: + + s3_whereby = boto3.client( + "s3", + aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID, + aws_secret_access_key=settings.AWS_WHEREBY_ACCESS_KEY_SECRET, + ) + try: + s3_whereby.delete_object( + Bucket=recording.bucket_name, Key=recording.object_key + ) + logger.info( + f"Deleted original Whereby recording: {recording.bucket_name}/{recording.object_key}" + ) + except Exception as e: + logger.error(f"Failed to delete Whereby recording: {e}") + + # non-transactional, files marked for deletion not actually deleted is possible + await transcripts_controller.update(transcript, {"audio_deleted": True}) + # 2. Delete processed audio from transcript storage S3 bucket + if transcript.audio_location == "storage": + + storage = get_transcripts_storage() + try: + await storage.delete_file(transcript.storage_audio_path) + logger.info( + f"Deleted processed audio from storage: {transcript.storage_audio_path}" + ) + except Exception as e: + logger.error(f"Failed to delete processed audio: {e}") + + # 3. Delete local audio files + try: + if hasattr(transcript, "audio_mp3_filename") and transcript.audio_mp3_filename: + transcript.audio_mp3_filename.unlink(missing_ok=True) + if hasattr(transcript, "audio_wav_filename") and transcript.audio_wav_filename: + transcript.audio_wav_filename.unlink(missing_ok=True) + except Exception as e: + logger.error(f"Failed to delete local audio files: {e}") + + logger.info("Consent cleanup done") + + @get_transcript async def pipeline_post_to_zulip(transcript: Transcript, logger: Logger): logger.info("Starting post to zulip") @@ -659,6 +738,12 @@ async def task_pipeline_final_summaries(*, transcript_id: str): await pipeline_summaries(transcript_id=transcript_id) +@shared_task +@asynctask +async def task_cleanup_consent(*, transcript_id: str): + await cleanup_consent(transcript_id=transcript_id) + + @shared_task @asynctask async def task_pipeline_post_to_zulip(*, transcript_id: str): @@ -675,6 +760,7 @@ def pipeline_post(*, transcript_id: str): | task_pipeline_upload_mp3.si(transcript_id=transcript_id) | task_pipeline_remove_upload.si(transcript_id=transcript_id) | task_pipeline_diarization.si(transcript_id=transcript_id) + | task_cleanup_consent.si(transcript_id=transcript_id) ) chain_title_preview = task_pipeline_title.si(transcript_id=transcript_id) chain_final_summaries = task_pipeline_final_summaries.si( diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 9fd8748e..54a8f87e 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -138,6 +138,7 @@ class Settings(BaseSettings): HEALTHCHECK_URL: str | None = None AWS_PROCESS_RECORDING_QUEUE_URL: str | None = None + SQS_POLLING_TIMEOUT_SECONDS: int = 60 WHEREBY_API_URL: str = "https://api.whereby.dev/v1" diff --git a/server/reflector/storage/__init__.py b/server/reflector/storage/__init__.py index fd4c72f0..ee6c7318 100644 --- a/server/reflector/storage/__init__.py +++ b/server/reflector/storage/__init__.py @@ -1 +1,10 @@ from .base import Storage # noqa + + +def get_transcripts_storage() -> Storage: + from reflector.settings import settings + + return Storage.get_instance( + name=settings.TRANSCRIPT_STORAGE_BACKEND, + settings_prefix="TRANSCRIPT_STORAGE_", + ) diff --git a/server/reflector/utils/__init__.py b/server/reflector/utils/__init__.py index e69de29b..909f735b 100644 --- a/server/reflector/utils/__init__.py +++ b/server/reflector/utils/__init__.py @@ -0,0 +1,5 @@ +from uuid import uuid4 + + +def generate_uuid4() -> str: + return str(uuid4()) diff --git a/server/reflector/views/_range_requests_response.py b/server/reflector/views/_range_requests_response.py index 2fac632d..8e3770ae 100644 --- a/server/reflector/views/_range_requests_response.py +++ b/server/reflector/views/_range_requests_response.py @@ -43,6 +43,11 @@ def range_requests_response( ): """Returns StreamingResponse using Range Requests of a given file""" + if not os.path.exists(file_path): + from fastapi import HTTPException + + raise HTTPException(status_code=404, detail="File not found") + file_size = os.stat(file_path).st_size range_header = request.headers.get("range") diff --git a/server/reflector/views/meetings.py b/server/reflector/views/meetings.py new file mode 100644 index 00000000..832b7c29 --- /dev/null +++ b/server/reflector/views/meetings.py @@ -0,0 +1,43 @@ +from datetime import datetime +from typing import Annotated, Optional + +import reflector.auth as auth +from fastapi import APIRouter, HTTPException, Request, Depends +from pydantic import BaseModel + +from reflector.db.meetings import ( + MeetingConsent, + meeting_consent_controller, + meetings_controller, +) + +router = APIRouter() + + +class MeetingConsentRequest(BaseModel): + consent_given: bool + + +@router.post("/meetings/{meeting_id}/consent") +async def meeting_audio_consent( + meeting_id: str, + request: MeetingConsentRequest, + user_request: Request, + user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], +): + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + user_id = user["sub"] if user else None + + consent = MeetingConsent( + meeting_id=meeting_id, + user_id=user_id, + consent_given=request.consent_given, + consent_timestamp=datetime.utcnow(), + ) + + updated_consent = await meeting_consent_controller.upsert(consent) + + return {"status": "success", "consent_id": updated_consent.id} diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index f9f7f4eb..9592fa31 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import Annotated, Optional +from typing import Annotated, Optional, Literal import reflector.auth as auth from fastapi import APIRouter, Depends, HTTPException @@ -37,6 +37,7 @@ class Meeting(BaseModel): host_room_url: str start_date: datetime end_date: datetime + recording_type: Literal["none", "local", "cloud"] = "cloud" class CreateRoom(BaseModel): diff --git a/server/reflector/views/transcripts.py b/server/reflector/views/transcripts.py index b6e56c44..27d6188e 100644 --- a/server/reflector/views/transcripts.py +++ b/server/reflector/views/transcripts.py @@ -65,6 +65,7 @@ class GetTranscript(BaseModel): source_kind: SourceKind room_id: str | None = None room_name: str | None = None + audio_deleted: bool | None = None class CreateTranscript(BaseModel): @@ -82,6 +83,7 @@ class UpdateTranscript(BaseModel): share_mode: Optional[Literal["public", "semi-private", "private"]] = Field(None) participants: Optional[list[TranscriptParticipant]] = Field(None) reviewed: Optional[bool] = Field(None) + audio_deleted: Optional[bool] = Field(None) class DeletionStatus(BaseModel): diff --git a/server/reflector/views/transcripts_audio.py b/server/reflector/views/transcripts_audio.py index 45d4eccc..c457f472 100644 --- a/server/reflector/views/transcripts_audio.py +++ b/server/reflector/views/transcripts_audio.py @@ -86,8 +86,17 @@ async def transcript_get_audio_mp3( headers=resp.headers, ) - if not transcript.audio_mp3_filename.exists(): - raise HTTPException(status_code=500, detail="Audio not found") + if transcript.audio_deleted: + raise HTTPException( + status_code=404, detail="Audio unavailable due to privacy settings" + ) + + if ( + not hasattr(transcript, "audio_mp3_filename") + or not transcript.audio_mp3_filename + or not transcript.audio_mp3_filename.exists() + ): + raise HTTPException(status_code=404, detail="Audio file not found") truncated_id = str(transcript.id).split("-")[0] filename = f"recording_{truncated_id}.mp3" diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index e8bad4ab..bf32ee95 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -25,11 +25,11 @@ else: app.conf.beat_schedule = { "process_messages": { "task": "reflector.worker.process.process_messages", - "schedule": 60.0, + "schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS), }, "process_meetings": { "task": "reflector.worker.process.process_meetings", - "schedule": 60.0, + "schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS), }, "reprocess_failed_recordings": { "task": "reflector.worker.process.reprocess_failed_recordings", diff --git a/www/app/(app)/transcripts/[transcriptId]/correct/topicPlayer.tsx b/www/app/(app)/transcripts/[transcriptId]/correct/topicPlayer.tsx index bdd9e5ea..3d07e3d9 100644 --- a/www/app/(app)/transcripts/[transcriptId]/correct/topicPlayer.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/correct/topicPlayer.tsx @@ -183,7 +183,18 @@ const TopicPlayer = ({ setIsPlaying(false); }; - const isLoaded = !!(mp3.media && topicTime); + const isLoaded = !mp3.loading && !!topicTime + const error = mp3.error; + if (error !== null) { + return + Loading error: {error} + + } + if (mp3.audioDeleted) { + return + This topic file has been deleted. + + } return ( ; } + if (mp3.error) { + return ( + + ); + } + + + return ( <> - {waveform.waveform && mp3.media && topics.topics ? ( + {waveform.waveform && mp3.media && !mp3.audioDeleted && topics.topics ? ( ) : waveform.error ? ( -
"error loading this recording"
+
error loading this recording
+ ) : mp3.audioDeleted ? ( +
Audio was deleted
) : ( )} diff --git a/www/app/(app)/transcripts/[transcriptId]/record/page.tsx b/www/app/(app)/transcripts/[transcriptId]/record/page.tsx index 2d227f57..52072938 100644 --- a/www/app/(app)/transcripts/[transcriptId]/record/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/record/page.tsx @@ -8,7 +8,6 @@ import "../../../../styles/button.css"; import { Topic } from "../../webSocketTypes"; import { lockWakeState, releaseWakeState } from "../../../../lib/wakeLock"; import { useRouter } from "next/navigation"; -import Player from "../../player"; import useMp3 from "../../useMp3"; import WaveformLoading from "../../waveformLoading"; import { Box, Text, Grid, Heading, VStack, Flex } from "@chakra-ui/react"; @@ -27,7 +26,7 @@ const TranscriptRecord = (details: TranscriptDetails) => { const webSockets = useWebSockets(details.params.transcriptId); - let mp3 = useMp3(details.params.transcriptId, true); + const mp3 = useMp3(details.params.transcriptId, true); const router = useRouter(); diff --git a/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx b/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx index c6e9eb69..bc9ff77a 100644 --- a/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx @@ -21,7 +21,7 @@ const TranscriptUpload = (details: TranscriptUpload) => { const webSockets = useWebSockets(details.params.transcriptId); - let mp3 = useMp3(details.params.transcriptId, true); + const mp3 = useMp3(details.params.transcriptId, true); const router = useRouter(); diff --git a/www/app/(app)/transcripts/new/page.tsx b/www/app/(app)/transcripts/new/page.tsx index 9d12ddda..2571edbf 100644 --- a/www/app/(app)/transcripts/new/page.tsx +++ b/www/app/(app)/transcripts/new/page.tsx @@ -43,6 +43,8 @@ import { Input, } from "@chakra-ui/react"; const TranscriptCreate = () => { + + const isClient = typeof window !== 'undefined'; const router = useRouter(); const { isLoading, isAuthenticated } = useSessionStatus(); const requireLogin = featureEnabled("requireLogin"); @@ -123,12 +125,12 @@ const TranscriptCreate = () => { Reflector is a transcription and summarization pipeline that transforms audio into knowledge. - + The output is meeting minutes and topic summaries enabling topic-specific analyses stored in your systems of record. This is accomplished on your infrastructure – without 3rd parties – keeping your data private, secure, and organized. - + @@ -179,24 +181,26 @@ const TranscriptCreate = () => { placeholder="Choose your language" /> - {loading ? ( - Checking permissions... - ) : permissionOk ? ( - - ) : permissionDenied ? ( - - Permission to use your microphone was denied, please change - the permission setting in your browser and refresh this - page. - + {isClient && !loading ? ( + permissionOk ? ( + + ) : permissionDenied ? ( + + Permission to use your microphone was denied, please change + the permission setting in your browser and refresh this + page. + + ) : ( + + ) ) : ( - + Checking permissions... )} + ); + }; + + return ( + + + + Can we have your permission to store this meeting's audio recording on our servers? + + + + + + + + ); + }, + onCloseComplete: () => { + setModalOpen(false); + } + }); + + // Handle escape key to close the toast + const handleKeyDown = (event: KeyboardEvent) => { + if (event.key === 'Escape') { + toast.close(toastId); + } + }; + + document.addEventListener('keydown', handleKeyDown); + + const cleanup = () => { + toast.close(toastId); + document.removeEventListener('keydown', handleKeyDown); + }; + + return cleanup; + }, [meetingId, toast, handleConsent, wherebyRef, modalOpen]); + + return { showConsentModal, consentState, hasConsent, consentLoading }; +} + +function ConsentDialogButton({ meetingId, wherebyRef }: { meetingId: string; wherebyRef: React.RefObject }) { + const { showConsentModal, consentState, hasConsent, consentLoading } = useConsentDialog(meetingId, wherebyRef); + + if (!consentState.ready || hasConsent(meetingId) || consentLoading) { + return null; + } + + return ( + + ); +} + +const recordingTypeRequiresConsent = (recordingType: NonNullable) => { + return recordingType === 'cloud'; +} + +// next throws even with "use client" +const useWhereby = () => { + const [wherebyLoaded, setWherebyLoaded] = useState(false); + useEffect(() => { + if (typeof window !== 'undefined') { + import("@whereby.com/browser-sdk/embed").then(() => { + setWherebyLoaded(true); + }).catch(console.error.bind(console)); + } + }, []); + return wherebyLoaded; +} + export default function Room(details: RoomDetails) { + const wherebyLoaded = useWhereby(); const wherebyRef = useRef(null); const roomName = details.params.roomName; const meeting = useRoomMeeting(roomName); const router = useRouter(); const { isLoading, isAuthenticated } = useSessionStatus(); - const [consentGiven, setConsentGiven] = useState(null); - const roomUrl = meeting?.response?.host_room_url ? meeting?.response?.host_room_url : meeting?.response?.room_url; + const meetingId = meeting?.response?.id; + + const recordingType = meeting?.response?.recording_type; + const handleLeave = useCallback(() => { router.push("/browse"); }, [router]); - const handleConsent = (consent: boolean) => { - setConsentGiven(consent); - }; - useEffect(() => { if ( !isLoading && @@ -47,14 +221,14 @@ export default function Room(details: RoomDetails) { }, [isLoading, meeting?.error]); useEffect(() => { - if (isLoading || !isAuthenticated || !roomUrl) return; + if (isLoading || !isAuthenticated || !roomUrl || !wherebyLoaded) return; wherebyRef.current?.addEventListener("leave", handleLeave); return () => { wherebyRef.current?.removeEventListener("leave", handleLeave); }; - }, [handleLeave, roomUrl, isLoading, isAuthenticated]); + }, [handleLeave, roomUrl, isLoading, isAuthenticated, wherebyLoaded]); if (isLoading) { return ( @@ -77,60 +251,18 @@ export default function Room(details: RoomDetails) { ); } - if (!isAuthenticated && !consentGiven) { - return ( - - - {consentGiven === null ? ( - <> - - This meeting may be recorded. Do you consent to being recorded? - - - - - - - ) : ( - <> - - You cannot join the meeting without consenting to being - recorded. - - - )} - - - ); - } return ( <> - {roomUrl && ( - + {roomUrl && meetingId && wherebyLoaded && ( + <> + + {recordingType && recordingTypeRequiresConsent(recordingType) && } + )} ); diff --git a/www/app/[roomName]/useRoomMeeting.tsx b/www/app/[roomName]/useRoomMeeting.tsx index 84ca1145..656945c9 100644 --- a/www/app/[roomName]/useRoomMeeting.tsx +++ b/www/app/[roomName]/useRoomMeeting.tsx @@ -49,7 +49,6 @@ const useRoomMeeting = ( .then((result) => { setResponse(result); setLoading(false); - console.debug("Meeting Loaded:", result); }) .catch((error) => { const shouldShowHuman = shouldShowError(error); diff --git a/www/app/api/schemas.gen.ts b/www/app/api/schemas.gen.ts index c9b5e28d..8b42c8a7 100644 --- a/www/app/api/schemas.gen.ts +++ b/www/app/api/schemas.gen.ts @@ -293,6 +293,17 @@ export const $GetTranscript = { ], title: "Room Name", }, + audio_deleted: { + anyOf: [ + { + type: "boolean", + }, + { + type: "null", + }, + ], + title: "Audio Deleted", + }, }, type: "object", required: [ @@ -535,6 +546,12 @@ export const $Meeting = { format: "date-time", title: "End Date", }, + recording_type: { + type: "string", + enum: ["none", "local", "cloud"], + title: "Recording Type", + default: "cloud", + }, }, type: "object", required: [ @@ -548,6 +565,18 @@ export const $Meeting = { title: "Meeting", } as const; +export const $MeetingConsentRequest = { + properties: { + consent_given: { + type: "boolean", + title: "Consent Given", + }, + }, + type: "object", + required: ["consent_given"], + title: "MeetingConsentRequest", +} as const; + export const $Page_GetTranscript_ = { properties: { items: { @@ -1097,6 +1126,17 @@ export const $UpdateTranscript = { ], title: "Reviewed", }, + audio_deleted: { + anyOf: [ + { + type: "boolean", + }, + { + type: "null", + }, + ], + title: "Audio Deleted", + }, }, type: "object", title: "UpdateTranscript", @@ -1166,6 +1206,35 @@ export const $ValidationError = { title: "ValidationError", } as const; +export const $WherebyWebhookEvent = { + properties: { + apiVersion: { + type: "string", + title: "Apiversion", + }, + id: { + type: "string", + title: "Id", + }, + createdAt: { + type: "string", + format: "date-time", + title: "Createdat", + }, + type: { + type: "string", + title: "Type", + }, + data: { + type: "object", + title: "Data", + }, + }, + type: "object", + required: ["apiVersion", "id", "createdAt", "type", "data"], + title: "WherebyWebhookEvent", +} as const; + export const $Word = { properties: { text: { diff --git a/www/app/api/services.gen.ts b/www/app/api/services.gen.ts index acf1b71f..a91155d1 100644 --- a/www/app/api/services.gen.ts +++ b/www/app/api/services.gen.ts @@ -4,6 +4,8 @@ import type { CancelablePromise } from "./core/CancelablePromise"; import type { BaseHttpRequest } from "./core/BaseHttpRequest"; import type { MetricsResponse, + V1MeetingAudioConsentData, + V1MeetingAudioConsentResponse, V1RoomsListData, V1RoomsListResponse, V1RoomsCreateData, @@ -64,6 +66,8 @@ import type { V1ZulipGetStreamsResponse, V1ZulipGetTopicsData, V1ZulipGetTopicsResponse, + V1WherebyWebhookData, + V1WherebyWebhookResponse, } from "./types.gen"; export class DefaultService { @@ -82,6 +86,31 @@ export class DefaultService { }); } + /** + * Meeting Audio Consent + * @param data The data for the request. + * @param data.meetingId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ + public v1MeetingAudioConsent( + data: V1MeetingAudioConsentData, + ): CancelablePromise { + return this.httpRequest.request({ + method: "POST", + url: "/v1/meetings/{meeting_id}/consent", + path: { + meeting_id: data.meetingId, + }, + body: data.requestBody, + mediaType: "application/json", + errors: { + 422: "Validation Error", + }, + }); + } + /** * Rooms List * @param data The data for the request. @@ -807,4 +836,25 @@ export class DefaultService { }, }); } + + /** + * Whereby Webhook + * @param data The data for the request. + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ + public v1WherebyWebhook( + data: V1WherebyWebhookData, + ): CancelablePromise { + return this.httpRequest.request({ + method: "POST", + url: "/v1/whereby", + body: data.requestBody, + mediaType: "application/json", + errors: { + 422: "Validation Error", + }, + }); + } } diff --git a/www/app/api/types.gen.ts b/www/app/api/types.gen.ts index 9b456648..38b77c69 100644 --- a/www/app/api/types.gen.ts +++ b/www/app/api/types.gen.ts @@ -56,6 +56,7 @@ export type GetTranscript = { source_kind: SourceKind; room_id?: string | null; room_name?: string | null; + audio_deleted?: boolean | null; }; export type GetTranscriptSegmentTopic = { @@ -107,6 +108,13 @@ export type Meeting = { host_room_url: string; start_date: string; end_date: string; + recording_type?: "none" | "local" | "cloud"; +}; + +export type recording_type = "none" | "local" | "cloud"; + +export type MeetingConsentRequest = { + consent_given: boolean; }; export type Page_GetTranscript_ = { @@ -215,6 +223,7 @@ export type UpdateTranscript = { share_mode?: "public" | "semi-private" | "private" | null; participants?: Array | null; reviewed?: boolean | null; + audio_deleted?: boolean | null; }; export type UserInfo = { @@ -229,6 +238,16 @@ export type ValidationError = { type: string; }; +export type WherebyWebhookEvent = { + apiVersion: string; + id: string; + createdAt: string; + type: string; + data: { + [key: string]: unknown; + }; +}; + export type Word = { text: string; start: number; @@ -238,6 +257,13 @@ export type Word = { export type MetricsResponse = unknown; +export type V1MeetingAudioConsentData = { + meetingId: string; + requestBody: MeetingConsentRequest; +}; + +export type V1MeetingAudioConsentResponse = unknown; + export type V1RoomsListData = { /** * Page number @@ -454,6 +480,12 @@ export type V1ZulipGetTopicsData = { export type V1ZulipGetTopicsResponse = Array; +export type V1WherebyWebhookData = { + requestBody: WherebyWebhookEvent; +}; + +export type V1WherebyWebhookResponse = unknown; + export type $OpenApiTs = { "/metrics": { get: { @@ -465,6 +497,21 @@ export type $OpenApiTs = { }; }; }; + "/v1/meetings/{meeting_id}/consent": { + post: { + req: V1MeetingAudioConsentData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/v1/rooms": { get: { req: V1RoomsListData; @@ -902,4 +949,19 @@ export type $OpenApiTs = { }; }; }; + "/v1/whereby": { + post: { + req: V1WherebyWebhookData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; }; diff --git a/www/app/layout.tsx b/www/app/layout.tsx index 5afc432c..b7ba328d 100644 --- a/www/app/layout.tsx +++ b/www/app/layout.tsx @@ -4,6 +4,7 @@ import SessionProvider from "./lib/SessionProvider"; import { ErrorProvider } from "./(errors)/errorContext"; import ErrorMessage from "./(errors)/errorMessage"; import { DomainContextProvider } from "./domainContext"; +import { RecordingConsentProvider } from "./recordingConsentContext"; import { getConfig } from "./lib/edgeConfig"; import { ErrorBoundary } from "@sentry/nextjs"; import { Providers } from "./providers"; @@ -68,12 +69,14 @@ export default async function RootLayout({ - "something went really wrong"

}> - - - {children} - -
+ + "something went really wrong"

}> + + + {children} + +
+
diff --git a/www/app/lib/WherebyEmbed.tsx b/www/app/lib/WherebyWebinarEmbed.tsx similarity index 90% rename from www/app/lib/WherebyEmbed.tsx rename to www/app/lib/WherebyWebinarEmbed.tsx index 6a7df0c7..94ce5d53 100644 --- a/www/app/lib/WherebyEmbed.tsx +++ b/www/app/lib/WherebyWebinarEmbed.tsx @@ -8,9 +8,11 @@ interface WherebyEmbedProps { onLeave?: () => void; } -export default function WherebyEmbed({ roomUrl, onLeave }: WherebyEmbedProps) { +// currently used for webinars only +export default function WherebyWebinarEmbed({ roomUrl, onLeave }: WherebyEmbedProps) { const wherebyRef = useRef(null); + // TODO extract common toast logic / styles to be used by consent toast on normal rooms const toast = useToast(); useEffect(() => { if (roomUrl && !localStorage.getItem("recording-notice-dismissed")) { diff --git a/www/app/recordingConsentContext.tsx b/www/app/recordingConsentContext.tsx new file mode 100644 index 00000000..80cf042a --- /dev/null +++ b/www/app/recordingConsentContext.tsx @@ -0,0 +1,113 @@ +"use client"; + +import React, { createContext, useContext, useEffect, useState } from "react"; + +type ConsentContextState = + | { ready: false } + | { + ready: true, + consentAnsweredForMeetings: Set + }; + +interface RecordingConsentContextValue { + state: ConsentContextState; + touch: (meetingId: string) => void; + hasConsent: (meetingId: string) => boolean; +} + +const RecordingConsentContext = createContext(undefined); + +export const useRecordingConsent = () => { + const context = useContext(RecordingConsentContext); + if (!context) { + throw new Error("useRecordingConsent must be used within RecordingConsentProvider"); + } + return context; +}; + +interface RecordingConsentProviderProps { + children: React.ReactNode; +} + +const LOCAL_STORAGE_KEY = "recording_consent_meetings"; + +export const RecordingConsentProvider: React.FC = ({ children }) => { + const [state, setState] = useState({ ready: false }); + + const safeWriteToStorage = (meetingIds: string[]): void => { + try { + if (typeof window !== 'undefined' && window.localStorage) { + localStorage.setItem(LOCAL_STORAGE_KEY, JSON.stringify(meetingIds)); + } + } catch (error) { + console.error("Failed to save consent data to localStorage:", error); + } + }; + + // writes to local storage and to the state of context both + const touch = (meetingId: string): void => { + + if (!state.ready) { + console.warn("Attempted to touch consent before context is ready"); + return; + } + + // has success regardless local storage write success: we don't handle that + // and don't want to crash anything with just consent functionality + const newSet = state.consentAnsweredForMeetings.has(meetingId) ? + state.consentAnsweredForMeetings : + new Set([...state.consentAnsweredForMeetings, meetingId]); + // note: preserves the set insertion order + const array = Array.from(newSet).slice(-5); // Keep latest 5 + safeWriteToStorage(array); + setState({ ready: true, consentAnsweredForMeetings: newSet }); + }; + + const hasConsent = (meetingId: string): boolean => { + if (!state.ready) return false; + return state.consentAnsweredForMeetings.has(meetingId); + }; + + // initialize on mount + useEffect(() => { + try { + if (typeof window === 'undefined' || !window.localStorage) { + setState({ ready: true, consentAnsweredForMeetings: new Set() }); + return; + } + + const stored = localStorage.getItem(LOCAL_STORAGE_KEY); + if (!stored) { + setState({ ready: true, consentAnsweredForMeetings: new Set() }); + return; + } + + const parsed = JSON.parse(stored); + if (!Array.isArray(parsed)) { + console.warn("Invalid consent data format in localStorage, resetting"); + setState({ ready: true, consentAnsweredForMeetings: new Set() }); + return; + } + + // pre-historic way of parsing! + const consentAnsweredForMeetings = new Set(parsed.filter(id => !!id && typeof id === 'string')); + setState({ ready: true, consentAnsweredForMeetings }); + } catch (error) { + // we don't want to fail the page here; the component is not essential. + console.error("Failed to parse consent data from localStorage:", error); + setState({ ready: true, consentAnsweredForMeetings: new Set() }); + } + }, []); + + const value: RecordingConsentContextValue = { + state, + touch, + hasConsent, + }; + + return ( + + {children} + + ); +}; \ No newline at end of file diff --git a/www/app/webinars/[title]/page.tsx b/www/app/webinars/[title]/page.tsx index 914bd4c4..04efacc0 100644 --- a/www/app/webinars/[title]/page.tsx +++ b/www/app/webinars/[title]/page.tsx @@ -5,7 +5,7 @@ import Image from "next/image"; import { notFound } from "next/navigation"; import useRoomMeeting from "../../[roomName]/useRoomMeeting"; import dynamic from "next/dynamic"; -const WherebyEmbed = dynamic(() => import("../../lib/WherebyEmbed"), { +const WherebyEmbed = dynamic(() => import("../../lib/WherebyWebinarEmbed"), { ssr: false, }); import { FormEvent } from "react";