diff --git a/server/reflector/pipelines/main_file_pipeline.py b/server/reflector/pipelines/main_file_pipeline.py index fb1f925c..08915f99 100644 --- a/server/reflector/pipelines/main_file_pipeline.py +++ b/server/reflector/pipelines/main_file_pipeline.py @@ -8,13 +8,11 @@ Uses parallel processing for transcription, diarization, and waveform generation import asyncio import uuid -from contextlib import asynccontextmanager from pathlib import Path import av import structlog from celery import chain, shared_task -from sqlalchemy.ext.asyncio import AsyncSession from reflector.asynctask import asynctask from reflector.db import get_session_factory @@ -23,7 +21,6 @@ from reflector.db.transcripts import ( SourceKind, Transcript, TranscriptStatus, - TranscriptTopic, transcripts_controller, ) from reflector.logger import logger @@ -87,26 +84,6 @@ class PipelineMainFile(PipelineMainBase): self.logger = logger.bind(transcript_id=self.transcript_id) self.empty_pipeline = EmptyPipeline(logger=self.logger) - async def get_transcript(self, session: AsyncSession) -> Transcript: - """Get transcript with session""" - result = await transcripts_controller.get_by_id(session, self.transcript_id) - if not result: - raise Exception("Transcript not found") - return result - - @asynccontextmanager - async def lock_transaction(self): - # This lock is to prevent multiple processor starting adding - # into event array at the same time - async with asyncio.Lock(): - yield - - @asynccontextmanager - async def transaction(self): - async with self.lock_transaction(): - async with get_session_factory()() as session: - yield session - def _handle_gather_exceptions(self, results: list, operation: str) -> None: """Handle exceptions from asyncio.gather with return_exceptions=True""" for i, result in enumerate(results): @@ -417,112 +394,6 @@ class PipelineMainFile(PipelineMainBase): await processor.flush() - async def on_topic(self, topic: TitleSummary): - """Handle topic event - save to database""" - async with get_session_factory()() as session: - transcript = await transcripts_controller.get_by_id( - session, self.transcript_id - ) - topic_obj = TranscriptTopic( - title=topic.title, - summary=topic.summary, - timestamp=topic.timestamp, - duration=topic.duration, - ) - await transcripts_controller.upsert_topic(session, transcript, topic_obj) - await transcripts_controller.append_event( - session, - transcript=transcript, - event="TOPIC", - data=topic_obj, - ) - - async def on_title(self, data): - """Handle title event""" - async with get_session_factory()() as session: - transcript = await transcripts_controller.get_by_id( - session, self.transcript_id - ) - if not transcript.title: - await transcripts_controller.update( - session, - transcript, - {"title": data.title}, - ) - await transcripts_controller.append_event( - session, - transcript=transcript, - event="FINAL_TITLE", - data={"title": data.title}, - ) - - async def on_long_summary(self, data): - """Handle long summary event""" - async with get_session_factory()() as session: - transcript = await transcripts_controller.get_by_id( - session, self.transcript_id - ) - await transcripts_controller.update( - session, - transcript, - {"long_summary": data.long_summary}, - ) - await transcripts_controller.append_event( - session, - transcript=transcript, - event="FINAL_LONG_SUMMARY", - data={"long_summary": data.long_summary}, - ) - - async def on_short_summary(self, data): - """Handle short summary event""" - async with get_session_factory()() as session: - transcript = await transcripts_controller.get_by_id( - session, self.transcript_id - ) - await transcripts_controller.update( - session, - transcript, - {"short_summary": data.short_summary}, - ) - await transcripts_controller.append_event( - session, - transcript=transcript, - event="FINAL_SHORT_SUMMARY", - data={"short_summary": data.short_summary}, - ) - - async def on_duration(self, duration): - """Handle duration event""" - async with get_session_factory()() as session: - transcript = await transcripts_controller.get_by_id( - session, self.transcript_id - ) - await transcripts_controller.update( - session, - transcript, - {"duration": duration}, - ) - await transcripts_controller.append_event( - session, - transcript=transcript, - event="DURATION", - data={"duration": duration}, - ) - - async def on_waveform(self, waveform): - """Handle waveform event""" - async with get_session_factory()() as session: - transcript = await transcripts_controller.get_by_id( - session, self.transcript_id - ) - await transcripts_controller.append_event( - session, - transcript=transcript, - event="WAVEFORM", - data={"waveform": waveform}, - ) - @shared_task @asynctask