fix: remove duplicated methods

This commit is contained in:
2025-09-23 16:47:30 -06:00
parent e0c71c5548
commit 0b2152ea75

View File

@@ -8,13 +8,11 @@ Uses parallel processing for transcription, diarization, and waveform generation
import asyncio import asyncio
import uuid import uuid
from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
import av import av
import structlog import structlog
from celery import chain, shared_task from celery import chain, shared_task
from sqlalchemy.ext.asyncio import AsyncSession
from reflector.asynctask import asynctask from reflector.asynctask import asynctask
from reflector.db import get_session_factory from reflector.db import get_session_factory
@@ -23,7 +21,6 @@ from reflector.db.transcripts import (
SourceKind, SourceKind,
Transcript, Transcript,
TranscriptStatus, TranscriptStatus,
TranscriptTopic,
transcripts_controller, transcripts_controller,
) )
from reflector.logger import logger from reflector.logger import logger
@@ -87,26 +84,6 @@ class PipelineMainFile(PipelineMainBase):
self.logger = logger.bind(transcript_id=self.transcript_id) self.logger = logger.bind(transcript_id=self.transcript_id)
self.empty_pipeline = EmptyPipeline(logger=self.logger) self.empty_pipeline = EmptyPipeline(logger=self.logger)
async def get_transcript(self, session: AsyncSession) -> Transcript:
"""Get transcript with session"""
result = await transcripts_controller.get_by_id(session, self.transcript_id)
if not result:
raise Exception("Transcript not found")
return result
@asynccontextmanager
async def lock_transaction(self):
# This lock is to prevent multiple processor starting adding
# into event array at the same time
async with asyncio.Lock():
yield
@asynccontextmanager
async def transaction(self):
async with self.lock_transaction():
async with get_session_factory()() as session:
yield session
def _handle_gather_exceptions(self, results: list, operation: str) -> None: def _handle_gather_exceptions(self, results: list, operation: str) -> None:
"""Handle exceptions from asyncio.gather with return_exceptions=True""" """Handle exceptions from asyncio.gather with return_exceptions=True"""
for i, result in enumerate(results): for i, result in enumerate(results):
@@ -417,112 +394,6 @@ class PipelineMainFile(PipelineMainBase):
await processor.flush() await processor.flush()
async def on_topic(self, topic: TitleSummary):
"""Handle topic event - save to database"""
async with get_session_factory()() as session:
transcript = await transcripts_controller.get_by_id(
session, self.transcript_id
)
topic_obj = TranscriptTopic(
title=topic.title,
summary=topic.summary,
timestamp=topic.timestamp,
duration=topic.duration,
)
await transcripts_controller.upsert_topic(session, transcript, topic_obj)
await transcripts_controller.append_event(
session,
transcript=transcript,
event="TOPIC",
data=topic_obj,
)
async def on_title(self, data):
"""Handle title event"""
async with get_session_factory()() as session:
transcript = await transcripts_controller.get_by_id(
session, self.transcript_id
)
if not transcript.title:
await transcripts_controller.update(
session,
transcript,
{"title": data.title},
)
await transcripts_controller.append_event(
session,
transcript=transcript,
event="FINAL_TITLE",
data={"title": data.title},
)
async def on_long_summary(self, data):
"""Handle long summary event"""
async with get_session_factory()() as session:
transcript = await transcripts_controller.get_by_id(
session, self.transcript_id
)
await transcripts_controller.update(
session,
transcript,
{"long_summary": data.long_summary},
)
await transcripts_controller.append_event(
session,
transcript=transcript,
event="FINAL_LONG_SUMMARY",
data={"long_summary": data.long_summary},
)
async def on_short_summary(self, data):
"""Handle short summary event"""
async with get_session_factory()() as session:
transcript = await transcripts_controller.get_by_id(
session, self.transcript_id
)
await transcripts_controller.update(
session,
transcript,
{"short_summary": data.short_summary},
)
await transcripts_controller.append_event(
session,
transcript=transcript,
event="FINAL_SHORT_SUMMARY",
data={"short_summary": data.short_summary},
)
async def on_duration(self, duration):
"""Handle duration event"""
async with get_session_factory()() as session:
transcript = await transcripts_controller.get_by_id(
session, self.transcript_id
)
await transcripts_controller.update(
session,
transcript,
{"duration": duration},
)
await transcripts_controller.append_event(
session,
transcript=transcript,
event="DURATION",
data={"duration": duration},
)
async def on_waveform(self, waveform):
"""Handle waveform event"""
async with get_session_factory()() as session:
transcript = await transcripts_controller.get_by_id(
session, self.transcript_id
)
await transcripts_controller.append_event(
session,
transcript=transcript,
event="WAVEFORM",
data={"waveform": waveform},
)
@shared_task @shared_task
@asynctask @asynctask