mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 12:19:06 +00:00
meeting consent vibe
This commit is contained in:
@@ -18,3 +18,4 @@ DIARIZATION_URL=https://monadical-sas--reflector-diarizer-web.modal.run
|
||||
BASE_URL=https://xxxxx.ngrok.app
|
||||
DIARIZATION_ENABLED=false
|
||||
|
||||
SQS_POLLING_TIMEOUT_SECONDS=60
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
"""add meeting consent table
|
||||
|
||||
Revision ID: 20250617140003
|
||||
Revises: f819277e5169
|
||||
Create Date: 2025-06-17 14:00:03.000000
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "20250617140003"
|
||||
down_revision: Union[str, None] = "f819277e5169"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# Create meeting_consent table
|
||||
op.create_table(
|
||||
'meeting_consent',
|
||||
sa.Column('id', sa.String(), nullable=False),
|
||||
sa.Column('meeting_id', sa.String(), nullable=False),
|
||||
sa.Column('user_identifier', sa.String(), nullable=False),
|
||||
sa.Column('consent_given', sa.Boolean(), nullable=False),
|
||||
sa.Column('consent_timestamp', sa.DateTime(), nullable=False),
|
||||
sa.Column('user_agent', sa.String(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.ForeignKeyConstraint(['meeting_id'], ['meeting.id']),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# Drop meeting_consent table
|
||||
op.drop_table('meeting_consent')
|
||||
@@ -0,0 +1,32 @@
|
||||
"""make user_identifier optional in meeting_consent
|
||||
|
||||
Revision ID: 38e116c82385
|
||||
Revises: 20250617140003
|
||||
Create Date: 2025-06-17 15:23:41.346980
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '38e116c82385'
|
||||
down_revision: Union[str, None] = '20250617140003'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# Make user_identifier column nullable
|
||||
op.alter_column('meeting_consent', 'user_identifier',
|
||||
existing_type=sa.String(),
|
||||
nullable=True)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# Revert user_identifier back to non-nullable
|
||||
op.alter_column('meeting_consent', 'user_identifier',
|
||||
existing_type=sa.String(),
|
||||
nullable=False)
|
||||
@@ -11,6 +11,7 @@ from reflector.events import subscribers_shutdown, subscribers_startup
|
||||
from reflector.logger import logger
|
||||
from reflector.metrics import metrics_init
|
||||
from reflector.settings import settings
|
||||
from reflector.views.meetings import router as meetings_router
|
||||
from reflector.views.rooms import router as rooms_router
|
||||
from reflector.views.rtc_offer import router as rtc_offer_router
|
||||
from reflector.views.transcripts import router as transcripts_router
|
||||
@@ -71,6 +72,7 @@ metrics_init(app, instrumentator)
|
||||
|
||||
# register views
|
||||
app.include_router(rtc_offer_router)
|
||||
app.include_router(meetings_router, prefix="/v1")
|
||||
app.include_router(rooms_router, prefix="/v1")
|
||||
app.include_router(transcripts_router, prefix="/v1")
|
||||
app.include_router(transcripts_audio_router, prefix="/v1")
|
||||
|
||||
@@ -3,9 +3,10 @@ from typing import Literal
|
||||
|
||||
import sqlalchemy as sa
|
||||
from fastapi import HTTPException
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
from reflector.db import database, metadata
|
||||
from reflector.db.rooms import Room
|
||||
from reflector.utils import generate_uuid4
|
||||
|
||||
meetings = sa.Table(
|
||||
"meeting",
|
||||
@@ -41,6 +42,26 @@ meetings = sa.Table(
|
||||
),
|
||||
)
|
||||
|
||||
meeting_consent = sa.Table(
|
||||
"meeting_consent",
|
||||
metadata,
|
||||
sa.Column("id", sa.String, primary_key=True),
|
||||
sa.Column("meeting_id", sa.String, sa.ForeignKey("meeting.id")),
|
||||
sa.Column("user_identifier", sa.String, nullable=True),
|
||||
sa.Column("consent_given", sa.Boolean),
|
||||
sa.Column("consent_timestamp", sa.DateTime),
|
||||
sa.Column("user_agent", sa.String, nullable=True),
|
||||
)
|
||||
|
||||
|
||||
class MeetingConsent(BaseModel):
|
||||
id: str = Field(default_factory=generate_uuid4)
|
||||
meeting_id: str
|
||||
user_identifier: str | None = None
|
||||
consent_given: bool
|
||||
consent_timestamp: datetime
|
||||
user_agent: str | None = None
|
||||
|
||||
|
||||
class Meeting(BaseModel):
|
||||
id: str
|
||||
@@ -116,7 +137,7 @@ class MeetingController:
|
||||
|
||||
async def get_active(self, room: Room, current_time: datetime) -> Meeting:
|
||||
"""
|
||||
Get latest meeting for a room.
|
||||
Get latest active meeting for a room.
|
||||
"""
|
||||
end_date = getattr(meetings.c, "end_date")
|
||||
query = (
|
||||
@@ -125,6 +146,7 @@ class MeetingController:
|
||||
sa.and_(
|
||||
meetings.c.room_id == room.id,
|
||||
meetings.c.end_date > current_time,
|
||||
meetings.c.is_active == True,
|
||||
)
|
||||
)
|
||||
.order_by(end_date.desc())
|
||||
@@ -167,4 +189,57 @@ class MeetingController:
|
||||
await database.execute(query)
|
||||
|
||||
|
||||
class MeetingConsentController:
|
||||
async def get_by_meeting_id(self, meeting_id: str) -> list[MeetingConsent]:
|
||||
query = meeting_consent.select().where(meeting_consent.c.meeting_id == meeting_id)
|
||||
results = await database.fetch_all(query)
|
||||
return [MeetingConsent(**result) for result in results]
|
||||
|
||||
async def get_by_meeting_and_user(self, meeting_id: str, user_identifier: str) -> MeetingConsent | None:
|
||||
"""Get existing consent for a specific user and meeting"""
|
||||
query = meeting_consent.select().where(
|
||||
meeting_consent.c.meeting_id == meeting_id,
|
||||
meeting_consent.c.user_identifier == user_identifier
|
||||
)
|
||||
result = await database.fetch_one(query)
|
||||
return MeetingConsent(**result) if result else None
|
||||
|
||||
async def create_or_update(self, consent: MeetingConsent) -> MeetingConsent:
|
||||
"""Create new consent or update existing one for authenticated users"""
|
||||
if consent.user_identifier:
|
||||
# For authenticated users, check if consent already exists
|
||||
existing = await self.get_by_meeting_and_user(consent.meeting_id, consent.user_identifier)
|
||||
if existing:
|
||||
# Update existing consent
|
||||
query = meeting_consent.update().where(
|
||||
meeting_consent.c.id == existing.id
|
||||
).values(
|
||||
consent_given=consent.consent_given,
|
||||
consent_timestamp=consent.consent_timestamp,
|
||||
user_agent=consent.user_agent
|
||||
)
|
||||
await database.execute(query)
|
||||
|
||||
# Return updated consent object
|
||||
existing.consent_given = consent.consent_given
|
||||
existing.consent_timestamp = consent.consent_timestamp
|
||||
existing.user_agent = consent.user_agent
|
||||
return existing
|
||||
|
||||
# For anonymous users or first-time authenticated users, create new record
|
||||
query = meeting_consent.insert().values(**consent.model_dump())
|
||||
await database.execute(query)
|
||||
return consent
|
||||
|
||||
async def has_any_denial(self, meeting_id: str) -> bool:
|
||||
"""Check if any participant denied consent for this meeting"""
|
||||
query = meeting_consent.select().where(
|
||||
meeting_consent.c.meeting_id == meeting_id,
|
||||
meeting_consent.c.consent_given == False
|
||||
)
|
||||
result = await database.fetch_one(query)
|
||||
return result is not None
|
||||
|
||||
|
||||
meetings_controller = MeetingController()
|
||||
meeting_consent_controller = MeetingConsentController()
|
||||
|
||||
@@ -138,6 +138,7 @@ class Settings(BaseSettings):
|
||||
HEALTHCHECK_URL: str | None = None
|
||||
|
||||
AWS_PROCESS_RECORDING_QUEUE_URL: str | None = None
|
||||
SQS_POLLING_TIMEOUT_SECONDS: int = 60
|
||||
|
||||
WHEREBY_API_URL: str = "https://api.whereby.dev/v1"
|
||||
|
||||
|
||||
@@ -1 +1,7 @@
|
||||
from .base import Storage # noqa
|
||||
|
||||
def get_storage() -> Storage:
|
||||
from reflector.settings import settings
|
||||
return Storage.get_instance(
|
||||
name=settings.TRANSCRIPT_STORAGE_BACKEND,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
def generate_uuid4() -> str:
|
||||
return str(uuid4())
|
||||
42
server/reflector/views/meetings.py
Normal file
42
server/reflector/views/meetings.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from datetime import datetime
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.db.meetings import (
|
||||
MeetingConsent,
|
||||
meeting_consent_controller,
|
||||
meetings_controller,
|
||||
)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
class MeetingConsentRequest(BaseModel):
|
||||
consent_given: bool
|
||||
user_identifier: str | None = None
|
||||
|
||||
|
||||
@router.post("/meetings/{meeting_id}/consent")
|
||||
async def meeting_audio_consent(
|
||||
meeting_id: str,
|
||||
request: MeetingConsentRequest,
|
||||
user_request: Request,
|
||||
):
|
||||
meeting = await meetings_controller.get_by_id(meeting_id)
|
||||
if not meeting:
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
# Store consent in meeting_consent table (create or update for authenticated users)
|
||||
consent = MeetingConsent(
|
||||
meeting_id=meeting_id,
|
||||
user_identifier=request.user_identifier,
|
||||
consent_given=request.consent_given,
|
||||
consent_timestamp=datetime.utcnow(),
|
||||
user_agent=user_request.headers.get("user-agent")
|
||||
)
|
||||
|
||||
# Use create_or_update to handle consent overrides for authenticated users
|
||||
updated_consent = await meeting_consent_controller.create_or_update(consent)
|
||||
|
||||
return {"status": "success", "consent_id": updated_consent.id}
|
||||
@@ -25,11 +25,11 @@ else:
|
||||
app.conf.beat_schedule = {
|
||||
"process_messages": {
|
||||
"task": "reflector.worker.process.process_messages",
|
||||
"schedule": 60.0,
|
||||
"schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS),
|
||||
},
|
||||
"process_meetings": {
|
||||
"task": "reflector.worker.process.process_meetings",
|
||||
"schedule": 60.0,
|
||||
"schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS),
|
||||
},
|
||||
"reprocess_failed_recordings": {
|
||||
"task": "reflector.worker.process.reprocess_failed_recordings",
|
||||
|
||||
@@ -9,10 +9,11 @@ import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from pydantic import ValidationError
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.meetings import meeting_consent_controller, meetings_controller
|
||||
from reflector.db.recordings import Recording, recordings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import SourceKind, transcripts_controller
|
||||
from reflector.storage import get_storage
|
||||
from reflector.pipelines.main_live_pipeline import asynctask, task_pipeline_process
|
||||
from reflector.settings import settings
|
||||
from reflector.whereby import get_room_sessions
|
||||
@@ -130,6 +131,51 @@ async def process_recording(bucket_name: str, object_key: str):
|
||||
await transcripts_controller.update(transcript, {"status": "uploaded"})
|
||||
|
||||
task_pipeline_process.delay(transcript_id=transcript.id)
|
||||
|
||||
# Check if any participant denied consent after transcript processing is complete
|
||||
should_delete = await meeting_consent_controller.has_any_denial(meeting.id)
|
||||
if should_delete:
|
||||
logger.info(f"Deleting audio files for {object_key} due to consent denial")
|
||||
await delete_audio_files_only(transcript, bucket_name, object_key)
|
||||
|
||||
|
||||
async def delete_audio_files_only(transcript, bucket_name: str, object_key: str):
|
||||
"""Delete ONLY audio files from all locations, keep transcript data"""
|
||||
|
||||
try:
|
||||
# 1. Delete original Whereby recording from S3
|
||||
s3_whereby = boto3.client(
|
||||
"s3",
|
||||
aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.AWS_WHEREBY_ACCESS_KEY_SECRET,
|
||||
)
|
||||
s3_whereby.delete_object(Bucket=bucket_name, Key=object_key)
|
||||
logger.info(f"Deleted original Whereby recording: {bucket_name}/{object_key}")
|
||||
|
||||
# 2. Delete processed audio from transcript storage S3 bucket
|
||||
if transcript.audio_location == "storage":
|
||||
storage = get_storage()
|
||||
await storage.delete_file(transcript.storage_audio_path)
|
||||
logger.info(f"Deleted processed audio from storage: {transcript.storage_audio_path}")
|
||||
|
||||
# 3. Delete local audio files (if any remain)
|
||||
if hasattr(transcript, 'audio_mp3_filename') and transcript.audio_mp3_filename:
|
||||
transcript.audio_mp3_filename.unlink(missing_ok=True)
|
||||
if hasattr(transcript, 'audio_wav_filename') and transcript.audio_wav_filename:
|
||||
transcript.audio_wav_filename.unlink(missing_ok=True)
|
||||
|
||||
upload_path = transcript.data_path / f"upload{os.path.splitext(object_key)[1]}"
|
||||
upload_path.unlink(missing_ok=True)
|
||||
|
||||
# 4. Update transcript to reflect audio deletion (keep all other data)
|
||||
await transcripts_controller.update(transcript, {
|
||||
'audio_location_deleted': True
|
||||
})
|
||||
|
||||
logger.info(f"Deleted all audio files for transcript {transcript.id}, kept transcript data")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete audio files for {object_key}: {str(e)}")
|
||||
|
||||
|
||||
@shared_task
|
||||
|
||||
Reference in New Issue
Block a user