Files
reflector/server/reflector/pipelines/main_live_pipeline.py
Mathieu Virbel 6f0c7c1a5e feat(cleanup): add automatic data retention for public instances (#574)
* feat(cleanup): add automatic data retention for public instances

- Add Celery task to clean up anonymous data after configurable retention period
- Delete transcripts, meetings, and orphaned recordings older than retention days
- Only runs when PUBLIC_MODE is enabled to prevent accidental data loss
- Properly removes all associated files (local and S3 storage)
- Add manual cleanup tool for testing and intervention
- Configure retention via PUBLIC_DATA_RETENTION_DAYS setting (default: 7 days)

Fixes #571

* fix: apply pre-commit formatting fixes

* fix: properly delete recording files from storage during cleanup

- Add storage deletion for orphaned recordings in both cleanup task and manual tool
- Delete from storage before removing database records
- Log warnings if storage deletion fails but continue with database cleanup

* Apply suggestion from @pr-agent-monadical[bot]

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* Apply suggestion from @pr-agent-monadical[bot]

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* refactor: cleanup_old_data for better logging

* fix: linting

* test: fix meeting cleanup test to not require room controller

- Simplify test by directly inserting meetings into database
- Remove dependency on non-existent rooms_controller.create method
- Tests now pass successfully

* fix: linting

* refactor: simplify cleanup tool to use worker implementation

- Remove duplicate cleanup logic from manual tool
- Use the same _cleanup_old_public_data function from worker
- Remove dry-run feature as requested
- Prevent code duplication and ensure consistency
- Update documentation to reflect changes

* refactor: split cleanup worker into smaller functions

- Move all imports to the top of the file
- Extract cleanup logic into separate functions:
  - cleanup_old_transcripts()
  - cleanup_old_meetings()
  - cleanup_orphaned_recordings()
  - log_cleanup_results()
- Make code more maintainable and testable
- Add days parameter support to Celery task
- Update manual tool to work with refactored code

* feat: add TypedDict typing for cleanup stats

- Add CleanupStats TypedDict for better type safety
- Update all function signatures to use proper typing
- Add return type annotations to _cleanup_old_public_data
- Improves code maintainability and IDE support

* feat: add CASCADE DELETE to meeting_consent foreign key

- Add ondelete="CASCADE" to meeting_consent.meeting_id foreign key
- Generate and apply migration to update existing constraint
- Remove manual consent deletion from cleanup code
- Add unit test to verify CASCADE DELETE behavior

* style: linting

* fix: alembic migration branchpoint

* fix: correct downgrade constraint name in CASCADE DELETE migration

* fix: regenerate CASCADE DELETE migration with proper constraint names

- Delete problematic migration and regenerate with correct names
- Use explicit constraint name in both upgrade and downgrade
- Ensure migration works bidirectionally
- All tests passing including CASCADE DELETE test

* style: linting

* refactor: simplify cleanup to use transcripts as entry point

- Remove orphaned_recordings cleanup (not part of this PR scope)
- Remove separate old_meetings cleanup
- Transcripts are now the main entry point for cleanup
- Associated meetings and recordings are deleted with their transcript
- Use single database connection for all operations
- Update tests to reflect new approach

* refactor: cleanup and rename functions for clarity

- Rename _cleanup_old_public_data to cleanup_old_public_data (make public)
- Rename celery task to cleanup_old_public_data_task for clarity
- Update docstrings and improve code organization
- Remove unnecessary comments and simplify deletion logic
- Update tests to use new function names
- All tests passing

* style: linting\

* style: typing and review

* fix: add transaction on cleanup_single_transcript

* fix: naming

---------

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-08-29 08:47:14 -06:00

829 lines
27 KiB
Python

"""
Main reflector pipeline for live streaming
==========================================
This is the default pipeline used in the API.
It is decoupled to:
- PipelineMainLive: have limited processing during live
- PipelineMainPost: do heavy lifting after the live
It is directly linked to our data model.
"""
import asyncio
import functools
from contextlib import asynccontextmanager
from typing import Generic
import av
import boto3
from celery import chord, current_task, group, shared_task
from pydantic import BaseModel
from structlog import BoundLogger as Logger
from reflector.asynctask import asynctask
from reflector.db.meetings import meeting_consent_controller, meetings_controller
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (
Transcript,
TranscriptDuration,
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
TranscriptFinalTitle,
TranscriptStatus,
TranscriptText,
TranscriptTopic,
TranscriptWaveform,
transcripts_controller,
)
from reflector.logger import logger
from reflector.pipelines.runner import PipelineMessage, PipelineRunner
from reflector.processors import (
AudioChunkerAutoProcessor,
AudioDiarizationAutoProcessor,
AudioDownscaleProcessor,
AudioFileWriterProcessor,
AudioMergeProcessor,
AudioTranscriptAutoProcessor,
Pipeline,
TranscriptFinalSummaryProcessor,
TranscriptFinalTitleProcessor,
TranscriptLinerProcessor,
TranscriptTopicDetectorProcessor,
TranscriptTranslatorAutoProcessor,
)
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.processors.types import AudioDiarizationInput
from reflector.processors.types import (
TitleSummaryWithId as TitleSummaryWithIdProcessorType,
)
from reflector.processors.types import Transcript as TranscriptProcessorType
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.ws_manager import WebsocketManager, get_ws_manager
from reflector.zulip import (
get_zulip_message,
send_message_to_zulip,
update_zulip_message,
)
def broadcast_to_sockets(func):
"""
Decorator to broadcast transcript event to websockets
concerning this transcript
"""
async def wrapper(self, *args, **kwargs):
resp = await func(self, *args, **kwargs)
if resp is None:
return
await self.ws_manager.send_json(
room_id=self.ws_room_id,
message=resp.model_dump(mode="json"),
)
return wrapper
def get_transcript(func):
"""
Decorator to fetch the transcript from the database from the first argument
"""
@functools.wraps(func)
async def wrapper(**kwargs):
transcript_id = kwargs.pop("transcript_id")
transcript = await transcripts_controller.get_by_id(transcript_id=transcript_id)
if not transcript:
raise Exception("Transcript {transcript_id} not found")
# Enhanced logger with Celery task context
tlogger = logger.bind(transcript_id=transcript.id)
if current_task:
tlogger = tlogger.bind(
task_id=current_task.request.id,
task_name=current_task.name,
worker_hostname=current_task.request.hostname,
task_retries=current_task.request.retries,
transcript_id=transcript_id,
)
try:
result = await func(transcript=transcript, logger=tlogger, **kwargs)
return result
except Exception as exc:
tlogger.error("Pipeline error", function_name=func.__name__, exc_info=exc)
raise
return wrapper
class StrValue(BaseModel):
value: str
class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]):
def __init__(self, transcript_id: str):
super().__init__()
self._lock = asyncio.Lock()
self.transcript_id = transcript_id
self.ws_room_id = f"ts:{self.transcript_id}"
self._ws_manager = None
@property
def ws_manager(self) -> WebsocketManager:
if self._ws_manager is None:
self._ws_manager = get_ws_manager()
return self._ws_manager
async def get_transcript(self) -> Transcript:
# fetch the transcript
result = await transcripts_controller.get_by_id(
transcript_id=self.transcript_id
)
if not result:
raise Exception("Transcript not found")
return result
@staticmethod
def wrap_transcript_topics(
topics: list[TranscriptTopic],
) -> list[TitleSummaryWithIdProcessorType]:
# transformation to a pipe-supported format
return [
TitleSummaryWithIdProcessorType(
id=topic.id,
title=topic.title,
summary=topic.summary,
timestamp=topic.timestamp,
duration=topic.duration,
transcript=TranscriptProcessorType(words=topic.words),
)
for topic in topics
]
@asynccontextmanager
async def lock_transaction(self):
# This lock is to prevent multiple processor starting adding
# into event array at the same time
async with self._lock:
yield
@asynccontextmanager
async def transaction(self):
async with self.lock_transaction():
async with transcripts_controller.transaction():
yield
@broadcast_to_sockets
async def on_status(self, status):
# if it's the first part, update the status of the transcript
# but do not set the ended status yet.
if isinstance(self, PipelineMainLive):
status_mapping: dict[str, TranscriptStatus] = {
"started": "recording",
"push": "recording",
"flush": "processing",
"error": "error",
}
elif isinstance(self, PipelineMainFinalSummaries):
status_mapping: dict[str, TranscriptStatus] = {
"push": "processing",
"flush": "processing",
"error": "error",
"ended": "ended",
}
else:
# intermediate pipeline don't update status
return
# mutate to model status
status = status_mapping.get(status)
if not status:
return
# when the status of the pipeline changes, update the transcript
async with self._lock:
return await transcripts_controller.set_status(self.transcript_id, status)
@broadcast_to_sockets
async def on_transcript(self, data):
async with self.transaction():
transcript = await self.get_transcript()
return await transcripts_controller.append_event(
transcript=transcript,
event="TRANSCRIPT",
data=TranscriptText(text=data.text, translation=data.translation),
)
@broadcast_to_sockets
async def on_topic(self, data):
topic = TranscriptTopic(
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
transcript=data.transcript.text,
words=data.transcript.words,
)
if isinstance(data, TitleSummaryWithIdProcessorType):
topic.id = data.id
async with self.transaction():
transcript = await self.get_transcript()
await transcripts_controller.upsert_topic(transcript, topic)
return await transcripts_controller.append_event(
transcript=transcript,
event="TOPIC",
data=topic,
)
@broadcast_to_sockets
async def on_title(self, data):
final_title = TranscriptFinalTitle(title=data.title)
async with self.transaction():
transcript = await self.get_transcript()
if not transcript.title:
await transcripts_controller.update(
transcript,
{
"title": final_title.title,
},
)
return await transcripts_controller.append_event(
transcript=transcript,
event="FINAL_TITLE",
data=final_title,
)
@broadcast_to_sockets
async def on_long_summary(self, data):
final_long_summary = TranscriptFinalLongSummary(long_summary=data.long_summary)
async with self.transaction():
transcript = await self.get_transcript()
await transcripts_controller.update(
transcript,
{
"long_summary": final_long_summary.long_summary,
},
)
return await transcripts_controller.append_event(
transcript=transcript,
event="FINAL_LONG_SUMMARY",
data=final_long_summary,
)
@broadcast_to_sockets
async def on_short_summary(self, data):
final_short_summary = TranscriptFinalShortSummary(
short_summary=data.short_summary
)
async with self.transaction():
transcript = await self.get_transcript()
await transcripts_controller.update(
transcript,
{
"short_summary": final_short_summary.short_summary,
},
)
return await transcripts_controller.append_event(
transcript=transcript,
event="FINAL_SHORT_SUMMARY",
data=final_short_summary,
)
@broadcast_to_sockets
async def on_duration(self, data):
async with self.transaction():
duration = TranscriptDuration(duration=data)
transcript = await self.get_transcript()
await transcripts_controller.update(
transcript,
{
"duration": duration.duration,
},
)
return await transcripts_controller.append_event(
transcript=transcript, event="DURATION", data=duration
)
@broadcast_to_sockets
async def on_waveform(self, data):
async with self.transaction():
waveform = TranscriptWaveform(waveform=data)
transcript = await self.get_transcript()
return await transcripts_controller.append_event(
transcript=transcript, event="WAVEFORM", data=waveform
)
class PipelineMainLive(PipelineMainBase):
"""
Main pipeline for live streaming, attach to RTC connection
Any long post process should be done in the post pipeline
"""
async def create(self) -> Pipeline:
# create a context for the whole rtc transaction
# add a customised logger to the context
transcript = await self.get_transcript()
processors = [
AudioFileWriterProcessor(
path=transcript.audio_wav_filename,
on_duration=self.on_duration,
),
AudioDownscaleProcessor(),
AudioChunkerAutoProcessor(),
AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(),
TranscriptLinerProcessor(),
TranscriptTranslatorAutoProcessor.as_threaded(callback=self.on_transcript),
TranscriptTopicDetectorProcessor.as_threaded(callback=self.on_topic),
]
pipeline = Pipeline(*processors)
pipeline.options = self
pipeline.set_pref("audio:source_language", transcript.source_language)
pipeline.set_pref("audio:target_language", transcript.target_language)
pipeline.logger.bind(transcript_id=transcript.id)
pipeline.logger.info("Pipeline main live created")
pipeline.describe()
return pipeline
async def on_ended(self):
# when the pipeline ends, connect to the post pipeline
logger.info("Pipeline main live ended", transcript_id=self.transcript_id)
logger.info("Scheduling pipeline main post", transcript_id=self.transcript_id)
pipeline_post(transcript_id=self.transcript_id)
class PipelineMainDiarization(PipelineMainBase[AudioDiarizationInput]):
"""
Diarize the audio and update topics
"""
async def create(self) -> Pipeline:
# create a context for the whole rtc transaction
# add a customised logger to the context
pipeline = Pipeline(
AudioDiarizationAutoProcessor(callback=self.on_topic),
)
pipeline.options = self
# now let's start the pipeline by pushing information to the
# first processor diarization processor
# XXX translation is lost when converting our data model to the processor model
transcript = await self.get_transcript()
# diarization works only if the file is uploaded to an external storage
if transcript.audio_location == "local":
pipeline.logger.info("Audio is local, skipping diarization")
return
audio_url = await transcript.get_audio_url()
audio_diarization_input = AudioDiarizationInput(
audio_url=audio_url,
topics=self.wrap_transcript_topics(transcript.topics),
)
# as tempting to use pipeline.push, prefer to use the runner
# to let the start just do one job.
pipeline.logger.bind(transcript_id=transcript.id)
pipeline.logger.info("Diarization pipeline created")
await self.push(audio_diarization_input)
await self.flush()
return pipeline
class PipelineMainFromTopics(PipelineMainBase[TitleSummaryWithIdProcessorType]):
"""
Pseudo class for generating a pipeline from topics
"""
def get_processors(self) -> list:
raise NotImplementedError
async def create(self) -> Pipeline:
# get transcript
self._transcript = transcript = await self.get_transcript()
# create pipeline
processors = self.get_processors()
pipeline = Pipeline(*processors)
pipeline.options = self
pipeline.logger.bind(transcript_id=transcript.id)
pipeline.logger.info(f"{self.__class__.__name__} pipeline created")
# push topics
topics = PipelineMainBase.wrap_transcript_topics(transcript.topics)
for topic in topics:
await self.push(topic)
await self.flush()
return pipeline
class PipelineMainTitle(PipelineMainFromTopics):
"""
Generate title from the topics
"""
def get_processors(self) -> list:
return [
TranscriptFinalTitleProcessor.as_threaded(callback=self.on_title),
]
class PipelineMainFinalSummaries(PipelineMainFromTopics):
"""
Generate summaries from the topics
"""
def get_processors(self) -> list:
return [
TranscriptFinalSummaryProcessor.as_threaded(
transcript=self._transcript,
callback=self.on_long_summary,
on_short_summary=self.on_short_summary,
),
]
class PipelineMainWaveform(PipelineMainFromTopics):
"""
Generate waveform
"""
def get_processors(self) -> list:
return [
AudioWaveformProcessor.as_threaded(
audio_path=self._transcript.audio_wav_filename,
waveform_path=self._transcript.audio_waveform_filename,
on_waveform=self.on_waveform,
),
]
@get_transcript
async def pipeline_remove_upload(transcript: Transcript, logger: Logger):
# for future changes: note that there's also a consent process happens, beforehand and users may not consent with keeping files. currently, we delete regardless, so it's no need for that
logger.info("Starting remove upload")
uploads = transcript.data_path.glob("upload.*")
for upload in uploads:
upload.unlink(missing_ok=True)
logger.info("Remove upload done")
@get_transcript
async def pipeline_waveform(transcript: Transcript, logger: Logger):
logger.info("Starting waveform")
runner = PipelineMainWaveform(transcript_id=transcript.id)
await runner.run()
logger.info("Waveform done")
@get_transcript
async def pipeline_convert_to_mp3(transcript: Transcript, logger: Logger):
logger.info("Starting convert to mp3")
# If the audio wav is not available, just skip
wav_filename = transcript.audio_wav_filename
if not wav_filename.exists():
logger.warning("Wav file not found, may be already converted")
return
# Convert to mp3
mp3_filename = transcript.audio_mp3_filename
with av.open(wav_filename.as_posix()) as in_container:
in_stream = in_container.streams.audio[0]
with av.open(mp3_filename.as_posix(), "w") as out_container:
out_stream = out_container.add_stream("mp3")
for frame in in_container.decode(in_stream):
for packet in out_stream.encode(frame):
out_container.mux(packet)
# Delete the wav file
transcript.audio_wav_filename.unlink(missing_ok=True)
logger.info("Convert to mp3 done")
@get_transcript
async def pipeline_upload_mp3(transcript: Transcript, logger: Logger):
if not settings.TRANSCRIPT_STORAGE_BACKEND:
logger.info("No storage backend configured, skipping mp3 upload")
return
if transcript.audio_deleted:
logger.info("Skipping mp3 upload - audio marked as deleted")
return
logger.info("Starting upload mp3")
# If the audio mp3 is not available, just skip
mp3_filename = transcript.audio_mp3_filename
if not mp3_filename.exists():
logger.warning("Mp3 file not found, may be already uploaded")
return
# Upload to external storage and delete the file
await transcripts_controller.move_mp3_to_storage(transcript)
logger.info("Upload mp3 done")
@get_transcript
async def pipeline_diarization(transcript: Transcript, logger: Logger):
logger.info("Starting diarization")
runner = PipelineMainDiarization(transcript_id=transcript.id)
await runner.run()
logger.info("Diarization done")
@get_transcript
async def pipeline_title(transcript: Transcript, logger: Logger):
logger.info("Starting title")
runner = PipelineMainTitle(transcript_id=transcript.id)
await runner.run()
logger.info("Title done")
@get_transcript
async def pipeline_summaries(transcript: Transcript, logger: Logger):
logger.info("Starting summaries")
runner = PipelineMainFinalSummaries(transcript_id=transcript.id)
await runner.run()
logger.info("Summaries done")
@get_transcript
async def cleanup_consent(transcript: Transcript, logger: Logger):
logger.info("Starting consent cleanup")
consent_denied = False
recording = None
try:
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording and recording.meeting_id:
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if meeting:
consent_denied = await meeting_consent_controller.has_any_denial(
meeting.id
)
except Exception as e:
logger.error(f"Failed to get fetch consent: {e}", exc_info=e)
consent_denied = True
if not consent_denied:
logger.info("Consent approved, keeping all files")
return
logger.info("Consent denied, cleaning up all related audio files")
if recording and recording.bucket_name and recording.object_key:
s3_whereby = boto3.client(
"s3",
aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_WHEREBY_ACCESS_KEY_SECRET,
)
try:
s3_whereby.delete_object(
Bucket=recording.bucket_name, Key=recording.object_key
)
logger.info(
f"Deleted original Whereby recording: {recording.bucket_name}/{recording.object_key}"
)
except Exception as e:
logger.error(f"Failed to delete Whereby recording: {e}", exc_info=e)
# non-transactional, files marked for deletion not actually deleted is possible
await transcripts_controller.update(transcript, {"audio_deleted": True})
# 2. Delete processed audio from transcript storage S3 bucket
if transcript.audio_location == "storage":
storage = get_transcripts_storage()
try:
await storage.delete_file(transcript.storage_audio_path)
logger.info(
f"Deleted processed audio from storage: {transcript.storage_audio_path}"
)
except Exception as e:
logger.error(f"Failed to delete processed audio: {e}", exc_info=e)
# 3. Delete local audio files
try:
if hasattr(transcript, "audio_mp3_filename") and transcript.audio_mp3_filename:
transcript.audio_mp3_filename.unlink(missing_ok=True)
if hasattr(transcript, "audio_wav_filename") and transcript.audio_wav_filename:
transcript.audio_wav_filename.unlink(missing_ok=True)
except Exception as e:
logger.error(f"Failed to delete local audio files: {e}", exc_info=e)
logger.info("Consent cleanup done")
@get_transcript
async def pipeline_post_to_zulip(transcript: Transcript, logger: Logger):
logger.info("Starting post to zulip")
if not transcript.recording_id:
logger.info("Transcript has no recording")
return
recording = await recordings_controller.get_by_id(transcript.recording_id)
if not recording:
logger.info("Recording not found")
return
if not recording.meeting_id:
logger.info("Recording has no meeting")
return
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.info("No meeting found for this recording")
return
room = await rooms_controller.get_by_id(meeting.room_id)
if not room:
logger.error(f"Missing room for a meeting {meeting.id}")
return
if room.zulip_auto_post:
message = get_zulip_message(transcript=transcript, include_topics=True)
message_updated = False
if transcript.zulip_message_id:
try:
await update_zulip_message(
transcript.zulip_message_id,
room.zulip_stream,
room.zulip_topic,
message,
)
message_updated = True
except Exception:
logger.error(
f"Failed to update zulip message with id {transcript.zulip_message_id}"
)
if not message_updated:
response = await send_message_to_zulip(
room.zulip_stream, room.zulip_topic, message
)
await transcripts_controller.update(
transcript, {"zulip_message_id": response["id"]}
)
logger.info("Posted to zulip")
# ===================================================================
# Celery tasks that can be called from the API
# ===================================================================
@shared_task
@asynctask
async def task_pipeline_remove_upload(*, transcript_id: str):
await pipeline_remove_upload(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_waveform(*, transcript_id: str):
await pipeline_waveform(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_convert_to_mp3(*, transcript_id: str):
await pipeline_convert_to_mp3(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_upload_mp3(*, transcript_id: str):
await pipeline_upload_mp3(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_diarization(*, transcript_id: str):
await pipeline_diarization(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_title(*, transcript_id: str):
await pipeline_title(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_final_summaries(*, transcript_id: str):
await pipeline_summaries(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_cleanup_consent(*, transcript_id: str):
await cleanup_consent(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_post_to_zulip(*, transcript_id: str):
await pipeline_post_to_zulip(transcript_id=transcript_id)
def pipeline_post(*, transcript_id: str):
"""
Run the post pipeline
"""
chain_mp3_and_diarize = (
task_pipeline_waveform.si(transcript_id=transcript_id)
| task_pipeline_convert_to_mp3.si(transcript_id=transcript_id)
| task_pipeline_upload_mp3.si(transcript_id=transcript_id)
| task_pipeline_remove_upload.si(transcript_id=transcript_id)
| task_pipeline_diarization.si(transcript_id=transcript_id)
| task_cleanup_consent.si(transcript_id=transcript_id)
)
chain_title_preview = task_pipeline_title.si(transcript_id=transcript_id)
chain_final_summaries = task_pipeline_final_summaries.si(
transcript_id=transcript_id
)
chain = chord(
group(chain_mp3_and_diarize, chain_title_preview),
chain_final_summaries,
) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id)
return chain.delay()
@get_transcript
async def pipeline_process(transcript: Transcript, logger: Logger):
try:
if transcript.audio_location == "storage":
await transcripts_controller.download_mp3_from_storage(transcript)
transcript.audio_waveform_filename.unlink(missing_ok=True)
await transcripts_controller.update(
transcript,
{
"topics": [],
},
)
# open audio
audio_filename = next(transcript.data_path.glob("upload.*"), None)
if audio_filename and transcript.status != "uploaded":
raise Exception("File upload is not completed")
if not audio_filename:
audio_filename = next(transcript.data_path.glob("audio.*"), None)
if not audio_filename:
raise Exception("There is no file to process")
container = av.open(audio_filename.as_posix())
# create pipeline
pipeline = PipelineMainLive(transcript_id=transcript.id)
pipeline.start()
# push audio to pipeline
try:
logger.info("Start pushing audio into the pipeline")
for frame in container.decode(audio=0):
await pipeline.push(frame)
finally:
logger.info("Flushing the pipeline")
await pipeline.flush()
logger.info("Waiting for the pipeline to end")
await pipeline.join()
except Exception as exc:
logger.error("Pipeline error", exc_info=exc)
await transcripts_controller.update(
transcript,
{
"status": "error",
},
)
raise
logger.info("Pipeline ended")
@shared_task
@asynctask
async def task_pipeline_process(*, transcript_id: str):
return await pipeline_process(transcript_id=transcript_id)