Restart processing

This commit is contained in:
2024-07-17 18:09:36 +02:00
parent 74a1c69e1f
commit 562f2c94f9
8 changed files with 219 additions and 23 deletions

View File

@@ -0,0 +1,50 @@
from typing import Annotated, Optional
import celery
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import task_pipeline_process
router = APIRouter()
class ProcessStatus(BaseModel):
status: str
@router.post("/transcripts/{transcript_id}/process")
async def transcript_process(
transcript_id: str,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
):
user_id = user["sub"] if user else None
transcript = await transcripts_controller.get_by_id_for_http(
transcript_id, user_id=user_id
)
if transcript.locked:
raise HTTPException(status_code=400, detail="Transcript is locked")
if task_is_scheduled_or_active(
"reflector.pipelines.main_live_pipeline.task_pipeline_process",
transcript_id=transcript_id,
):
return ProcessStatus(status="already running")
# schedule a background task process the file
task_pipeline_process.delay(transcript_id=transcript_id)
return ProcessStatus(status="ok")
def task_is_scheduled_or_active(task_name: str, **kwargs):
inspect = celery.current_app.control.inspect()
for worker, tasks in (inspect.scheduled() | inspect.active()).items():
for task in tasks:
if task["name"] == task_name and task["kwargs"] == kwargs:
return True
return False