remove conductor and add hatchet tests (no-mistakes)

This commit is contained in:
Igor Loskutov
2025-12-16 13:23:39 -05:00
parent 1f49deb5b5
commit e81e0cb5c3
44 changed files with 537 additions and 4234 deletions

View File

@@ -12,7 +12,6 @@ from reflector.events import subscribers_shutdown, subscribers_startup
from reflector.logger import logger
from reflector.metrics import metrics_init
from reflector.settings import settings
from reflector.views.conductor import router as conductor_router
from reflector.views.daily import router as daily_router
from reflector.views.hatchet import router as hatchet_router
from reflector.views.meetings import router as meetings_router
@@ -100,7 +99,6 @@ app.include_router(user_ws_router, prefix="/v1")
app.include_router(zulip_router, prefix="/v1")
app.include_router(whereby_router, prefix="/v1")
app.include_router(daily_router, prefix="/v1/daily")
app.include_router(conductor_router, prefix="/v1")
app.include_router(hatchet_router, prefix="/v1")
add_pagination(app)

View File

@@ -1,5 +0,0 @@
"""Conductor workflow orchestration module."""
from reflector.conductor.client import ConductorClientManager
__all__ = ["ConductorClientManager"]

View File

@@ -1,40 +0,0 @@
"""Conductor Python client wrapper."""
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow_client import WorkflowClient
from reflector.settings import settings
class ConductorClientManager:
"""Singleton manager for Conductor client connections."""
_instance: OrkesClients | None = None
@classmethod
def get_client(cls) -> WorkflowClient:
"""Get or create the workflow client."""
if cls._instance is None:
config = Configuration(
server_api_url=settings.CONDUCTOR_SERVER_URL,
debug=settings.CONDUCTOR_DEBUG,
)
cls._instance = OrkesClients(config)
return cls._instance.get_workflow_client()
@classmethod
def start_workflow(cls, name: str, version: int, input_data: dict) -> str:
"""Start a workflow and return the workflow ID."""
client = cls.get_client()
return client.start_workflow_by_name(name, input_data, version=version)
@classmethod
def get_workflow_status(cls, workflow_id: str) -> dict:
"""Get the current status of a workflow."""
client = cls.get_client()
return client.get_workflow(workflow_id, include_tasks=True)
@classmethod
def reset(cls) -> None:
"""Reset the client instance (for testing)."""
cls._instance = None

View File

@@ -1,103 +0,0 @@
"""Progress event emission for Conductor workers."""
import asyncio
from typing import Literal
from reflector.db.transcripts import PipelineProgressData
from reflector.logger import logger
from reflector.ws_manager import get_ws_manager
# Step mapping for progress tracking
# Maps task names to their index in the pipeline
PIPELINE_STEPS = {
"get_recording": 1,
"get_participants": 2,
"pad_track": 3, # Fork tasks share same step
"mixdown_tracks": 4,
"generate_waveform": 5,
"transcribe_track": 6, # Fork tasks share same step
"merge_transcripts": 7,
"detect_topics": 8,
"generate_title": 9, # Fork tasks share same step
"generate_summary": 9, # Fork tasks share same step
"finalize": 10,
"cleanup_consent": 11,
"post_zulip": 12,
"send_webhook": 13,
}
TOTAL_STEPS = 13
async def _emit_progress_async(
transcript_id: str,
step: str,
status: Literal["pending", "in_progress", "completed", "failed"],
workflow_id: str | None = None,
) -> None:
"""Async implementation of progress emission."""
ws_manager = get_ws_manager()
step_index = PIPELINE_STEPS.get(step, 0)
data = PipelineProgressData(
workflow_id=workflow_id,
current_step=step,
step_index=step_index,
total_steps=TOTAL_STEPS,
step_status=status,
)
await ws_manager.send_json(
room_id=f"ts:{transcript_id}",
message={
"event": "PIPELINE_PROGRESS",
"data": data.model_dump(),
},
)
logger.debug(
"[Progress] Emitted",
transcript_id=transcript_id,
step=step,
status=status,
step_index=step_index,
)
def emit_progress(
transcript_id: str,
step: str,
status: Literal["pending", "in_progress", "completed", "failed"],
workflow_id: str | None = None,
) -> None:
"""Emit a pipeline progress event (sync wrapper for Conductor workers).
Args:
transcript_id: The transcript ID to emit progress for
step: The current step name (e.g., "transcribe_track")
status: The step status
workflow_id: Optional workflow ID
"""
try:
# Get or create event loop for sync context
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None and loop.is_running():
# Already in async context, schedule the coroutine
asyncio.create_task(
_emit_progress_async(transcript_id, step, status, workflow_id)
)
else:
# Not in async context, run synchronously
asyncio.run(_emit_progress_async(transcript_id, step, status, workflow_id))
except Exception as e:
# Progress emission should never break the pipeline
logger.warning(
"[Progress] Failed to emit progress event",
error=str(e),
transcript_id=transcript_id,
step=step,
)

View File

@@ -1,58 +0,0 @@
"""
Run Conductor workers for the diarization pipeline.
Usage:
uv run -m reflector.conductor.run_workers
# Or via docker:
docker compose exec server uv run -m reflector.conductor.run_workers
"""
import signal
import sys
import time
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from reflector.conductor import workers # noqa: F401 - registers workers via decorators
from reflector.logger import logger
from reflector.settings import settings
def main() -> None:
"""Start Conductor worker polling."""
if not settings.CONDUCTOR_ENABLED:
logger.error("CONDUCTOR_ENABLED is False, not starting workers")
sys.exit(1)
logger.info(
"Starting Conductor workers",
server_url=settings.CONDUCTOR_SERVER_URL,
)
config = Configuration(
server_api_url=settings.CONDUCTOR_SERVER_URL,
debug=settings.CONDUCTOR_DEBUG,
)
task_handler = TaskHandler(configuration=config)
# Handle graceful shutdown
def shutdown_handler(signum: int, frame) -> None:
logger.info("Received shutdown signal, stopping workers...")
task_handler.stop_processes()
sys.exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
logger.info("Starting task polling...")
task_handler.start_processes()
# Keep alive
while True:
time.sleep(1)
if __name__ == "__main__":
main()

View File

@@ -1,207 +0,0 @@
"""Shadow mode comparison for Celery vs Conductor pipeline results."""
from dataclasses import dataclass
from typing import Any
from reflector.conductor.client import ConductorClientManager
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.logger import logger
@dataclass
class FieldDifference:
"""A difference between Celery and Conductor field values."""
field: str
celery_value: Any
conductor_value: Any
@dataclass
class ComparisonResult:
"""Result of comparing Celery and Conductor outputs."""
match: bool
differences: list[FieldDifference]
celery_status: str
conductor_status: str
error: str | None = None
async def compare_content_results(
recording_id: str, workflow_id: str
) -> ComparisonResult:
"""
Compare content results from Celery and Conductor pipelines.
Args:
recording_id: Recording ID to look up Celery transcript
workflow_id: Conductor workflow ID to get workflow output
Returns:
ComparisonResult with match status and any differences
"""
try:
# Get Celery result from DB
celery_transcript = await transcripts_controller.get_by_recording_id(
recording_id
)
if not celery_transcript:
return ComparisonResult(
match=False,
differences=[],
celery_status="not_found",
conductor_status="unknown",
error=f"No transcript found for recording_id={recording_id}",
)
# Get Conductor workflow status
workflow_status = ConductorClientManager.get_workflow_status(workflow_id)
conductor_status = workflow_status.status if workflow_status else "unknown"
# If workflow not completed, can't compare
if conductor_status != "COMPLETED":
return ComparisonResult(
match=False,
differences=[],
celery_status=celery_transcript.status,
conductor_status=conductor_status,
error=f"Conductor workflow not completed: {conductor_status}",
)
# Extract output from workflow
workflow_output = (
workflow_status.output if hasattr(workflow_status, "output") else {}
)
differences = _compare_fields(celery_transcript, workflow_output)
result = ComparisonResult(
match=len(differences) == 0,
differences=differences,
celery_status=celery_transcript.status,
conductor_status=conductor_status,
)
# Log comparison result
if result.match:
logger.info(
"Shadow mode comparison: MATCH",
recording_id=recording_id,
workflow_id=workflow_id,
)
else:
logger.warning(
"Shadow mode comparison: MISMATCH",
recording_id=recording_id,
workflow_id=workflow_id,
differences=[
{
"field": d.field,
"celery": d.celery_value,
"conductor": d.conductor_value,
}
for d in differences
],
)
return result
except Exception as e:
logger.error(
"Shadow mode comparison failed",
recording_id=recording_id,
workflow_id=workflow_id,
error=str(e),
exc_info=True,
)
return ComparisonResult(
match=False,
differences=[],
celery_status="unknown",
conductor_status="unknown",
error=str(e),
)
def _compare_fields(
celery_transcript: Transcript, workflow_output: dict
) -> list[FieldDifference]:
"""Compare specific content fields between Celery and Conductor."""
differences = []
# Compare title
conductor_title = workflow_output.get("title")
if celery_transcript.title != conductor_title:
differences.append(
FieldDifference(
field="title",
celery_value=celery_transcript.title,
conductor_value=conductor_title,
)
)
# Compare short_summary
conductor_short_summary = workflow_output.get("short_summary")
if celery_transcript.short_summary != conductor_short_summary:
differences.append(
FieldDifference(
field="short_summary",
celery_value=celery_transcript.short_summary,
conductor_value=conductor_short_summary,
)
)
# Compare long_summary
conductor_long_summary = workflow_output.get("summary")
if celery_transcript.long_summary != conductor_long_summary:
differences.append(
FieldDifference(
field="long_summary",
celery_value=celery_transcript.long_summary,
conductor_value=conductor_long_summary,
)
)
# Compare topic count
celery_topics = celery_transcript.topics or []
conductor_topics = workflow_output.get("topics", [])
if len(celery_topics) != len(conductor_topics):
differences.append(
FieldDifference(
field="topic_count",
celery_value=len(celery_topics),
conductor_value=len(conductor_topics),
)
)
# Compare word count from events
celery_events = celery_transcript.events or {}
celery_words = (
celery_events.get("words", []) if isinstance(celery_events, dict) else []
)
conductor_words = workflow_output.get("all_words", [])
if len(celery_words) != len(conductor_words):
differences.append(
FieldDifference(
field="word_count",
celery_value=len(celery_words),
conductor_value=len(conductor_words),
)
)
# Compare duration
conductor_duration = workflow_output.get("duration")
if (
conductor_duration is not None
and celery_transcript.duration != conductor_duration
):
differences.append(
FieldDifference(
field="duration",
celery_value=celery_transcript.duration,
conductor_value=conductor_duration,
)
)
return differences

View File

@@ -1,6 +0,0 @@
"""Conductor task definitions module."""
from reflector.conductor.tasks.definitions import TASK_DEFINITIONS
from reflector.conductor.tasks.register import register_task_definitions
__all__ = ["TASK_DEFINITIONS", "register_task_definitions"]

View File

@@ -1,161 +0,0 @@
"""Task definitions for Conductor workflow orchestration.
Timeout reference (from CONDUCTOR_MIGRATION_REQUIREMENTS.md):
| Task | Timeout (s) | Response Timeout (s) | Retry Count |
|-------------------|-------------|----------------------|-------------|
| get_recording | 60 | 30 | 3 |
| get_participants | 60 | 30 | 3 |
| pad_track | 300 | 120 | 3 |
| mixdown_tracks | 600 | 300 | 3 |
| generate_waveform | 120 | 60 | 3 |
| transcribe_track | 1800 | 900 | 3 |
| merge_transcripts | 60 | 30 | 3 |
| detect_topics | 300 | 120 | 3 |
| generate_title | 60 | 30 | 3 |
| generate_summary | 300 | 120 | 3 |
| finalize | 60 | 30 | 3 |
| cleanup_consent | 60 | 30 | 3 |
| post_zulip | 60 | 30 | 5 |
| send_webhook | 60 | 30 | 30 |
"""
OWNER_EMAIL = "reflector@example.com"
TASK_DEFINITIONS = [
{
"name": "get_recording",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["recording_id"],
"outputKeys": ["id", "mtg_session_id", "room_name", "duration"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "get_participants",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["mtg_session_id"],
"outputKeys": ["participants"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "pad_track",
"retryCount": 3,
"timeoutSeconds": 300,
"responseTimeoutSeconds": 120,
"inputKeys": ["track_index", "s3_key", "bucket_name", "transcript_id"],
"outputKeys": ["padded_url", "size", "track_index"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "mixdown_tracks",
"retryCount": 3,
"timeoutSeconds": 600,
"responseTimeoutSeconds": 300,
"inputKeys": ["padded_urls", "transcript_id"],
"outputKeys": ["audio_key", "duration", "size"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "generate_waveform",
"retryCount": 3,
"timeoutSeconds": 120,
"responseTimeoutSeconds": 60,
"inputKeys": ["audio_key", "transcript_id"],
"outputKeys": ["waveform"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "transcribe_track",
"retryCount": 3,
"timeoutSeconds": 1800,
"responseTimeoutSeconds": 900,
"inputKeys": ["track_index", "audio_url", "language"],
"outputKeys": ["words", "track_index"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "merge_transcripts",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcripts", "transcript_id"],
"outputKeys": ["all_words", "word_count"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "detect_topics",
"retryCount": 3,
"timeoutSeconds": 300,
"responseTimeoutSeconds": 120,
"inputKeys": ["words", "transcript_id", "target_language"],
"outputKeys": ["topics"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "generate_title",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["topics", "transcript_id"],
"outputKeys": ["title"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "generate_summary",
"retryCount": 3,
"timeoutSeconds": 300,
"responseTimeoutSeconds": 120,
"inputKeys": ["words", "topics", "transcript_id"],
"outputKeys": ["summary", "short_summary"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "finalize",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcript_id", "title", "summary", "short_summary", "duration"],
"outputKeys": ["status"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "cleanup_consent",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcript_id"],
"outputKeys": ["audio_deleted", "reason"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "post_zulip",
"retryCount": 5,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcript_id"],
"outputKeys": ["message_id"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "send_webhook",
"retryCount": 30,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcript_id", "room_id"],
"outputKeys": ["sent", "status_code"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "generate_dynamic_fork_tasks",
"retryCount": 3,
"timeoutSeconds": 30,
"responseTimeoutSeconds": 15,
"inputKeys": ["tracks", "task_type", "transcript_id", "bucket_name"],
"outputKeys": ["tasks", "inputs"],
"ownerEmail": OWNER_EMAIL,
"description": "Helper task to generate dynamic fork structure for variable track counts",
},
]

View File

@@ -1,60 +0,0 @@
"""Register task definitions with Conductor server."""
import httpx
from reflector.conductor.tasks.definitions import TASK_DEFINITIONS
from reflector.logger import logger
from reflector.settings import settings
def register_task_definitions() -> None:
"""Register all task definitions with Conductor server.
Raises:
httpx.HTTPStatusError: If registration fails.
"""
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/")
url = f"{base_url}/metadata/taskdefs"
logger.info(
"Registering task definitions",
count=len(TASK_DEFINITIONS),
url=url,
)
with httpx.Client(timeout=30.0) as client:
resp = client.post(
url,
json=TASK_DEFINITIONS,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info("Task definitions registered successfully")
async def register_task_definitions_async() -> None:
"""Async version of register_task_definitions."""
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/")
url = f"{base_url}/metadata/taskdefs"
logger.info(
"Registering task definitions",
count=len(TASK_DEFINITIONS),
url=url,
)
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
url,
json=TASK_DEFINITIONS,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info("Task definitions registered successfully")
if __name__ == "__main__":
register_task_definitions()
print(f"Registered {len(TASK_DEFINITIONS)} task definitions")

View File

@@ -1,37 +0,0 @@
"""Conductor workers for the diarization pipeline."""
from reflector.conductor.workers.cleanup_consent import cleanup_consent
from reflector.conductor.workers.detect_topics import detect_topics
from reflector.conductor.workers.finalize import finalize
from reflector.conductor.workers.generate_dynamic_fork_tasks import (
generate_dynamic_fork_tasks,
)
from reflector.conductor.workers.generate_summary import generate_summary
from reflector.conductor.workers.generate_title import generate_title
from reflector.conductor.workers.generate_waveform import generate_waveform
from reflector.conductor.workers.get_participants import get_participants
from reflector.conductor.workers.get_recording import get_recording
from reflector.conductor.workers.merge_transcripts import merge_transcripts
from reflector.conductor.workers.mixdown_tracks import mixdown_tracks
from reflector.conductor.workers.pad_track import pad_track
from reflector.conductor.workers.post_zulip import post_zulip
from reflector.conductor.workers.send_webhook import send_webhook
from reflector.conductor.workers.transcribe_track import transcribe_track
__all__ = [
"get_recording",
"get_participants",
"pad_track",
"mixdown_tracks",
"generate_waveform",
"transcribe_track",
"merge_transcripts",
"detect_topics",
"generate_title",
"generate_summary",
"finalize",
"cleanup_consent",
"post_zulip",
"send_webhook",
"generate_dynamic_fork_tasks",
]

View File

@@ -1,126 +0,0 @@
"""Conductor worker: cleanup_consent - Check consent and delete audio if denied."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="cleanup_consent")
def cleanup_consent(task: Task) -> TaskResult:
"""Check participant consent and delete audio if denied.
Input:
transcript_id: str - Transcript ID
Output:
audio_deleted: bool - Whether audio was deleted
reason: str | None - Reason for deletion
"""
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] cleanup_consent", transcript_id=transcript_id)
if transcript_id:
emit_progress(
transcript_id, "cleanup_consent", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing transcript_id"
return task_result
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.transcripts import transcripts_controller
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
# Create fresh database connection for subprocess (not shared from parent)
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript is None:
raise ValueError(f"Transcript {transcript_id} not found in database")
# Check if any participant denied consent
# This mirrors the logic from main_live_pipeline.task_cleanup_consent
audio_deleted = False
reason = None
if transcript.participants:
for p in transcript.participants:
if hasattr(p, "consent") and p.consent == "denied":
audio_deleted = True
reason = f"Participant {p.name or p.id} denied consent"
break
if audio_deleted:
storage = get_transcripts_storage()
audio_key = f"{transcript_id}/audio.mp3"
try:
await storage.delete_file(audio_key)
await transcripts_controller.update(
transcript, {"audio_deleted": True}
)
logger.info(
"[Worker] cleanup_consent: audio deleted",
transcript_id=transcript_id,
reason=reason,
)
except Exception as e:
logger.warning(
"[Worker] cleanup_consent: failed to delete audio",
error=str(e),
)
return audio_deleted, reason
finally:
await db.disconnect()
_database_context.set(None)
try:
audio_deleted, reason = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"audio_deleted": audio_deleted,
"reason": reason,
}
logger.info(
"[Worker] cleanup_consent complete",
transcript_id=transcript_id,
audio_deleted=audio_deleted,
)
if transcript_id:
emit_progress(
transcript_id, "cleanup_consent", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] cleanup_consent failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "cleanup_consent", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,93 +0,0 @@
"""Conductor worker: detect_topics - Detect topics using LLM."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="detect_topics")
def detect_topics(task: Task) -> TaskResult:
"""Detect topics using LLM.
Input:
words: list[dict] - Transcribed words
transcript_id: str - Transcript ID
target_language: str - Target language code (default: "en")
Output:
topics: list[dict] - Detected topics
"""
words = task.input_data.get("words", [])
transcript_id = task.input_data.get("transcript_id")
target_language = task.input_data.get("target_language", "en")
logger.info(
"[Worker] detect_topics",
word_count=len(words),
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "detect_topics", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
import asyncio
async def _process():
from reflector.pipelines import topic_processing
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import Word
# Convert word dicts to Word objects
word_objects = [Word(**w) for w in words]
transcript = TranscriptType(words=word_objects)
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
async def noop_callback(t):
pass
topics = await topic_processing.detect_topics(
transcript,
target_language,
on_topic_callback=noop_callback,
empty_pipeline=empty_pipeline,
)
return [t.model_dump() for t in topics]
try:
topics = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"topics": topics}
logger.info(
"[Worker] detect_topics complete",
transcript_id=transcript_id,
topic_count=len(topics),
)
if transcript_id:
emit_progress(
transcript_id, "detect_topics", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] detect_topics failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "detect_topics", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,111 +0,0 @@
"""Conductor worker: finalize - Finalize transcript status and update database."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="finalize")
def finalize(task: Task) -> TaskResult:
"""Finalize the transcript status and update the database.
Input:
transcript_id: str - Transcript ID
title: str - Generated title
summary: str - Long summary
short_summary: str - Short summary
duration: float - Audio duration
Output:
status: str - "COMPLETED"
"""
transcript_id = task.input_data.get("transcript_id")
title = task.input_data.get("title", "")
summary = task.input_data.get("summary", "")
short_summary = task.input_data.get("short_summary", "")
duration = task.input_data.get("duration", 0)
logger.info(
"[Worker] finalize",
transcript_id=transcript_id,
title=title,
)
if transcript_id:
emit_progress(
transcript_id, "finalize", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing transcript_id"
return task_result
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.transcripts import transcripts_controller
from reflector.settings import settings
# Create fresh database connection for subprocess (not shared from parent)
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript is None:
raise ValueError(f"Transcript {transcript_id} not found in database")
await transcripts_controller.update(
transcript,
{
"status": "ended",
"title": title,
"long_summary": summary,
"short_summary": short_summary,
"duration": duration,
},
)
return True
finally:
await db.disconnect()
_database_context.set(None)
try:
asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"status": "COMPLETED"}
logger.info(
"[Worker] finalize complete",
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "finalize", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] finalize failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "finalize", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,110 +0,0 @@
"""Conductor worker: generate_dynamic_fork_tasks - Helper for FORK_JOIN_DYNAMIC."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.logger import logger
@worker_task(task_definition_name="generate_dynamic_fork_tasks")
def generate_dynamic_fork_tasks(task: Task) -> TaskResult:
"""Generate dynamic fork task structure for variable track counts.
This helper task generates the task definitions and inputs needed for
FORK_JOIN_DYNAMIC to process N tracks in parallel.
Input:
tracks: list[dict] - List of track info with s3_key
task_type: str - Either "pad_track" or "transcribe_track"
transcript_id: str - Transcript ID
bucket_name: str - S3 bucket name (for pad_track)
padded_urls: list[dict] - Padded track outputs (for transcribe_track)
Output:
tasks: list[dict] - Task definitions for dynamic fork
inputs: dict - Input parameters keyed by task reference name
"""
tracks = task.input_data.get("tracks", [])
task_type = task.input_data.get("task_type")
transcript_id = task.input_data.get("transcript_id")
bucket_name = task.input_data.get("bucket_name")
padded_urls = task.input_data.get("padded_urls", {})
logger.info(
"[Worker] generate_dynamic_fork_tasks",
task_type=task_type,
track_count=len(tracks),
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not tracks or not task_type:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing tracks or task_type"
return task_result
try:
tasks = []
inputs = {}
for idx, track in enumerate(tracks):
ref_name = f"{task_type}_{idx}"
# Task definition
tasks.append(
{
"name": task_type,
"taskReferenceName": ref_name,
"type": "SIMPLE",
}
)
# Task input based on type
if task_type == "pad_track":
inputs[ref_name] = {
"track_index": idx,
"s3_key": track.get("s3_key"),
"bucket_name": bucket_name,
"transcript_id": transcript_id,
}
elif task_type == "transcribe_track":
# Get padded URL from previous fork join output
padded_url = None
if isinstance(padded_urls, dict):
# Try to get from join output structure
pad_ref = f"pad_track_{idx}"
if pad_ref in padded_urls:
padded_url = padded_urls[pad_ref].get("padded_url")
elif "padded_url" in padded_urls:
# Single track case
padded_url = padded_urls.get("padded_url")
inputs[ref_name] = {
"track_index": idx,
"audio_url": padded_url,
"language": "en",
"transcript_id": transcript_id,
}
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"tasks": tasks,
"inputs": inputs,
}
logger.info(
"[Worker] generate_dynamic_fork_tasks complete",
task_type=task_type,
task_count=len(tasks),
)
except Exception as e:
logger.error("[Worker] generate_dynamic_fork_tasks failed", error=str(e))
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
return task_result

View File

@@ -1,150 +0,0 @@
"""Conductor worker: generate_summary - Generate meeting summaries using LLM."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="generate_summary")
def generate_summary(task: Task) -> TaskResult:
"""Generate long and short summaries from topics and words using LLM.
Input:
words: list[dict] - Transcribed words
topics: list[dict] - Detected topics
transcript_id: str - Transcript ID
Output:
summary: str - Long summary
short_summary: str - Short summary
"""
words = task.input_data.get("words", [])
topics = task.input_data.get("topics", [])
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] generate_summary",
word_count=len(words),
topic_count=len(topics),
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "generate_summary", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines import topic_processing
from reflector.processors.types import TitleSummary, Word
from reflector.processors.types import Transcript as TranscriptType
from reflector.settings import settings
# Create fresh database connection for subprocess (not shared from parent)
# Reset context var to ensure we get a fresh connection
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
# detect_topics returns TitleSummary objects (with transcript: Transcript)
# When serialized, transcript becomes {translation, words} dict
# We need to reconstruct TitleSummary objects with proper Transcript
def normalize_topic(t):
topic = dict(t)
transcript_data = topic.get("transcript")
if isinstance(transcript_data, dict):
# Reconstruct Transcript object from serialized dict
words_list = transcript_data.get("words", [])
word_objects = [
Word(**w) if isinstance(w, dict) else w for w in words_list
]
topic["transcript"] = TranscriptType(
words=word_objects,
translation=transcript_data.get("translation"),
)
elif transcript_data is None:
topic["transcript"] = TranscriptType(words=[])
return topic
topic_objects = [TitleSummary(**normalize_topic(t)) for t in topics]
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
transcript = await transcripts_controller.get_by_id(transcript_id)
long_summary = ""
short_summary = ""
async def on_long(s):
nonlocal long_summary
# s is FinalLongSummary object
long_summary = s.long_summary if hasattr(s, "long_summary") else str(s)
async def on_short(s):
nonlocal short_summary
# s is FinalShortSummary object
short_summary = (
s.short_summary if hasattr(s, "short_summary") else str(s)
)
await topic_processing.generate_summaries(
topic_objects,
transcript,
on_long_summary_callback=on_long,
on_short_summary_callback=on_short,
empty_pipeline=empty_pipeline,
logger=logger,
)
return long_summary, short_summary
finally:
await db.disconnect()
_database_context.set(None)
try:
summary, short_summary = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"summary": summary,
"short_summary": short_summary,
}
logger.info(
"[Worker] generate_summary complete",
transcript_id=transcript_id,
summary_len=len(summary) if summary else 0,
)
if transcript_id:
emit_progress(
transcript_id,
"generate_summary",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] generate_summary failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "generate_summary", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,111 +0,0 @@
"""Conductor worker: generate_title - Generate meeting title using LLM."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="generate_title")
def generate_title(task: Task) -> TaskResult:
"""Generate meeting title from detected topics using LLM.
Input:
topics: list[dict] - Detected topics
transcript_id: str - Transcript ID
Output:
title: str - Generated title
"""
topics = task.input_data.get("topics", [])
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] generate_title",
topic_count=len(topics),
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "generate_title", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not topics:
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"title": "Untitled Meeting"}
return task_result
import asyncio
async def _process():
from reflector.pipelines import topic_processing
from reflector.processors.types import TitleSummary, Word
from reflector.processors.types import Transcript as TranscriptType
# detect_topics returns TitleSummary objects (with transcript: Transcript)
# When serialized, transcript becomes {translation, words} dict
# We need to reconstruct TitleSummary objects with proper Transcript
def normalize_topic(t):
topic = dict(t)
transcript_data = topic.get("transcript")
if isinstance(transcript_data, dict):
# Reconstruct Transcript object from serialized dict
words_list = transcript_data.get("words", [])
word_objects = [
Word(**w) if isinstance(w, dict) else w for w in words_list
]
topic["transcript"] = TranscriptType(
words=word_objects, translation=transcript_data.get("translation")
)
elif transcript_data is None:
topic["transcript"] = TranscriptType(words=[])
return topic
topic_objects = [TitleSummary(**normalize_topic(t)) for t in topics]
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
async def noop_callback(t):
pass
title = await topic_processing.generate_title(
topic_objects,
on_title_callback=noop_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
return title
try:
title = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"title": title}
logger.info(
"[Worker] generate_title complete",
transcript_id=transcript_id,
title=title,
)
if transcript_id:
emit_progress(
transcript_id, "generate_title", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] generate_title failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "generate_title", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,106 +0,0 @@
"""Conductor worker: generate_waveform - Generate waveform visualization data."""
import tempfile
from pathlib import Path
import httpx
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
from reflector.storage import get_transcripts_storage
from reflector.utils.audio_waveform import get_audio_waveform
PRESIGNED_URL_EXPIRATION_SECONDS = 7200
@worker_task(task_definition_name="generate_waveform")
def generate_waveform(task: Task) -> TaskResult:
"""Generate waveform visualization data from mixed audio.
Input:
audio_key: str - S3 key of the audio file
transcript_id: str - Transcript ID
Output:
waveform: list[float] - Waveform peaks array
"""
audio_key = task.input_data.get("audio_key")
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] generate_waveform", audio_key=audio_key, transcript_id=transcript_id
)
if transcript_id:
emit_progress(
transcript_id, "generate_waveform", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not audio_key or not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing audio_key or transcript_id"
return task_result
import asyncio
async def _process():
storage = get_transcripts_storage()
audio_url = await storage.get_file_url(
audio_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
# Download audio to temp file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
tmp_path = Path(tmp.name)
async with httpx.AsyncClient() as client:
resp = await client.get(audio_url)
resp.raise_for_status()
tmp.write(resp.content)
try:
waveform = get_audio_waveform(tmp_path, segments_count=255)
finally:
tmp_path.unlink(missing_ok=True)
return waveform
try:
waveform = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"waveform": waveform}
logger.info(
"[Worker] generate_waveform complete",
transcript_id=transcript_id,
peaks_count=len(waveform) if waveform else 0,
)
if transcript_id:
emit_progress(
transcript_id,
"generate_waveform",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] generate_waveform failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "generate_waveform", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,96 +0,0 @@
"""Conductor worker: get_participants - Fetch meeting participants from Daily.co API."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.dailyco_api.client import DailyApiClient
from reflector.logger import logger
from reflector.settings import settings
@worker_task(task_definition_name="get_participants")
def get_participants(task: Task) -> TaskResult:
"""Fetch meeting participants from Daily.co API.
Input:
mtg_session_id: str - Daily.co meeting session identifier
transcript_id: str - Transcript ID for progress tracking
Output:
participants: list[dict] - List of participant info
- participant_id: str
- user_name: str | None
- user_id: str | None
"""
mtg_session_id = task.input_data.get("mtg_session_id")
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] get_participants", mtg_session_id=mtg_session_id)
if transcript_id:
emit_progress(
transcript_id, "get_participants", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not mtg_session_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing mtg_session_id"
return task_result
if not settings.DAILY_API_KEY:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "DAILY_API_KEY not configured"
return task_result
import asyncio
async def _fetch():
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
return await client.get_meeting_participants(mtg_session_id)
try:
response = asyncio.run(_fetch())
participants = [
{
"participant_id": p.participant_id,
"user_name": p.user_name,
"user_id": p.user_id,
}
for p in response.data
]
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"participants": participants}
logger.info(
"[Worker] get_participants complete",
mtg_session_id=mtg_session_id,
count=len(participants),
)
if transcript_id:
emit_progress(
transcript_id,
"get_participants",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] get_participants failed", error=str(e))
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "get_participants", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,90 +0,0 @@
"""Conductor worker: get_recording - Fetch recording metadata from Daily.co API."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.dailyco_api.client import DailyApiClient
from reflector.logger import logger
from reflector.settings import settings
@worker_task(task_definition_name="get_recording")
def get_recording(task: Task) -> TaskResult:
"""Fetch recording metadata from Daily.co API.
Input:
recording_id: str - Daily.co recording identifier
transcript_id: str - Transcript ID for progress tracking
Output:
id: str - Recording ID
mtg_session_id: str - Meeting session ID
room_name: str - Room name
duration: int - Recording duration in seconds
"""
recording_id = task.input_data.get("recording_id")
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] get_recording", recording_id=recording_id)
if transcript_id:
emit_progress(
transcript_id, "get_recording", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not recording_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing recording_id"
return task_result
if not settings.DAILY_API_KEY:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "DAILY_API_KEY not configured"
return task_result
import asyncio
async def _fetch():
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
return await client.get_recording(recording_id)
try:
recording = asyncio.run(_fetch())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"id": recording.id,
"mtg_session_id": recording.mtgSessionId,
"room_name": recording.room_name,
"duration": recording.duration,
}
logger.info(
"[Worker] get_recording complete",
recording_id=recording_id,
room_name=recording.room_name,
duration=recording.duration,
)
if transcript_id:
emit_progress(
transcript_id, "get_recording", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] get_recording failed", error=str(e))
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "get_recording", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,89 +0,0 @@
"""Conductor worker: merge_transcripts - Merge multiple track transcriptions."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="merge_transcripts")
def merge_transcripts(task: Task) -> TaskResult:
"""Merge multiple track transcriptions into single timeline sorted by timestamp.
Input:
transcripts: list[dict] - List of transcription results with words
transcript_id: str - Transcript ID
Output:
all_words: list[dict] - Merged and sorted words
word_count: int - Total word count
"""
transcripts = task.input_data.get("transcripts", [])
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] merge_transcripts",
transcript_count=len(transcripts)
if isinstance(transcripts, (list, dict))
else 0,
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "merge_transcripts", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
try:
all_words = []
# Handle JOIN output (dict with task refs as keys)
if isinstance(transcripts, dict):
transcripts = list(transcripts.values())
for t in transcripts:
if isinstance(t, list):
all_words.extend(t)
elif isinstance(t, dict) and "words" in t:
all_words.extend(t["words"])
# Sort by start timestamp
all_words.sort(key=lambda w: w.get("start", 0))
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"all_words": all_words,
"word_count": len(all_words),
}
logger.info(
"[Worker] merge_transcripts complete",
transcript_id=transcript_id,
word_count=len(all_words),
)
if transcript_id:
emit_progress(
transcript_id,
"merge_transcripts",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] merge_transcripts failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "merge_transcripts", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,278 +0,0 @@
"""Conductor worker: mixdown_tracks - Mix multiple audio tracks into single file.
Builds PyAV filter graph with amix filter to combine N padded tracks into
a single stereo MP3 file.
"""
import tempfile
from fractions import Fraction
from pathlib import Path
import av
from av.audio.resampler import AudioResampler
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
from reflector.storage import get_transcripts_storage
PRESIGNED_URL_EXPIRATION_SECONDS = 7200
MP3_BITRATE = 192000
def _build_mixdown_filter_graph(containers: list, target_sample_rate: int):
"""Build PyAV filter graph: N abuffer -> amix -> aformat -> sink.
Args:
containers: List of PyAV containers for input tracks
target_sample_rate: Output sample rate
Returns:
Tuple of (graph, inputs list, sink)
"""
graph = av.filter.Graph()
inputs = []
for idx in range(len(containers)):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
# amix with normalize=0 to prevent volume reduction
mixer = graph.add("amix", args=f"inputs={len(containers)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=f"sample_fmts=s16:channel_layouts=stereo:sample_rates={target_sample_rate}",
name="fmt",
)
sink = graph.add("abuffersink", name="out")
for idx, in_ctx in enumerate(inputs):
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
return graph, inputs, sink
@worker_task(task_definition_name="mixdown_tracks")
def mixdown_tracks(task: Task) -> TaskResult:
"""Mix multiple audio tracks into single stereo file.
Input:
padded_urls: list[str] - Presigned URLs of padded tracks
transcript_id: str - Transcript ID for storage path
Output:
audio_key: str - S3 key of mixed audio file
duration: float - Audio duration in seconds
size: int - File size in bytes
"""
padded_urls = task.input_data.get("padded_urls", [])
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] mixdown_tracks",
track_count=len(padded_urls),
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "mixdown_tracks", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not padded_urls or not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing padded_urls or transcript_id"
return task_result
import asyncio
async def _process():
storage = get_transcripts_storage()
# Determine target sample rate from first track
target_sample_rate = None
for url in padded_urls:
if not url:
continue
try:
with av.open(url) as container:
for frame in container.decode(audio=0):
target_sample_rate = frame.sample_rate
break
except Exception:
continue
if target_sample_rate:
break
if not target_sample_rate:
raise Exception("Mixdown failed: No decodable audio frames in any track")
# Open all containers with reconnect options for S3 streaming
containers = []
valid_urls = [url for url in padded_urls if url]
for url in valid_urls:
try:
c = av.open(
url,
options={
"reconnect": "1",
"reconnect_streamed": "1",
"reconnect_delay_max": "5",
},
)
containers.append(c)
except Exception as e:
logger.warning(
"Mixdown: failed to open container", url=url[:50], error=str(e)
)
if not containers:
raise Exception("Mixdown failed: Could not open any track containers")
try:
# Build filter graph
graph, inputs, sink = _build_mixdown_filter_graph(
containers, target_sample_rate
)
# Create temp file for output
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
temp_path = temp_file.name
try:
# Open output container for MP3
with av.open(temp_path, "w", format="mp3") as out_container:
out_stream = out_container.add_stream(
"libmp3lame", rate=target_sample_rate
)
out_stream.bit_rate = MP3_BITRATE
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(
format="s32", layout="stereo", rate=target_sample_rate
)
for _ in decoders
]
duration_samples = 0
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
inputs[i].push(None)
continue
if frame.sample_rate != target_sample_rate:
continue
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
duration_samples += mixed.samples
for packet in out_stream.encode(mixed):
out_container.mux(packet)
# Flush remaining
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
duration_samples += mixed.samples
for packet in out_stream.encode(mixed):
out_container.mux(packet)
for packet in out_stream.encode(None):
out_container.mux(packet)
# Get file size and duration
file_size = Path(temp_path).stat().st_size
duration = (
duration_samples / target_sample_rate if target_sample_rate else 0
)
# Upload to S3
storage_path = f"{transcript_id}/audio.mp3"
with open(temp_path, "rb") as mp3_file:
await storage.put_file(storage_path, mp3_file)
finally:
Path(temp_path).unlink(missing_ok=True)
finally:
for c in containers:
try:
c.close()
except Exception:
pass
return {
"audio_key": storage_path,
"duration": duration,
"size": file_size,
}
try:
result = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = result
logger.info(
"[Worker] mixdown_tracks complete",
audio_key=result["audio_key"],
duration=result["duration"],
size=result["size"],
)
if transcript_id:
emit_progress(
transcript_id, "mixdown_tracks", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] mixdown_tracks failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "mixdown_tracks", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,322 +0,0 @@
"""Conductor worker: pad_track - Pad audio track with silence for alignment.
This worker extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay). The padded audio is uploaded
to S3 and a presigned URL is returned.
"""
import math
import tempfile
from fractions import Fraction
from pathlib import Path
import av
from av.audio.resampler import AudioResampler
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
# Audio constants matching existing pipeline
OPUS_STANDARD_SAMPLE_RATE = 48000
OPUS_DEFAULT_BIT_RATE = 64000
PRESIGNED_URL_EXPIRATION_SECONDS = 7200
def _extract_stream_start_time_from_container(container, track_idx: int) -> float:
"""Extract meeting-relative start time from WebM stream metadata.
Uses PyAV to read stream.start_time from WebM container.
More accurate than filename timestamps by ~209ms due to network/encoding delays.
Args:
container: PyAV container object
track_idx: Track index for logging
Returns:
Start time in seconds (0.0 if not found)
"""
start_time_seconds = 0.0
try:
audio_streams = [s for s in container.streams if s.type == "audio"]
stream = audio_streams[0] if audio_streams else container.streams[0]
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
if stream.start_time is not None and stream.time_base is not None:
start_time_seconds = float(stream.start_time * stream.time_base)
# 2) Fallback to container-level start_time (in av.time_base units)
if (start_time_seconds <= 0) and (container.start_time is not None):
start_time_seconds = float(container.start_time * av.time_base)
# 3) Fallback to first packet DTS in stream.time_base
if start_time_seconds <= 0:
for packet in container.demux(stream):
if packet.dts is not None:
start_time_seconds = float(packet.dts * stream.time_base)
break
except Exception as e:
logger.warning(
"PyAV metadata read failed; assuming 0 start_time",
track_idx=track_idx,
error=str(e),
)
start_time_seconds = 0.0
logger.info(
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
track_idx=track_idx,
)
return start_time_seconds
def _apply_audio_padding_to_file(
in_container,
output_path: str,
start_time_seconds: float,
track_idx: int,
) -> None:
"""Apply silence padding to audio track using PyAV filter graph.
Filter chain: abuffer -> aresample -> adelay -> abuffersink
Args:
in_container: PyAV input container
output_path: Path to write padded output
start_time_seconds: Amount of silence to prepend
track_idx: Track index for logging
"""
delay_ms = math.floor(start_time_seconds * 1000)
logger.info(
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
track_idx=track_idx,
delay_ms=delay_ms,
)
with av.open(output_path, "w", format="webm") as out_container:
in_stream = next((s for s in in_container.streams if s.type == "audio"), None)
if in_stream is None:
raise Exception("No audio stream in input")
out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
# adelay requires one delay value per channel separated by '|'
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay")
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
# Decode -> resample -> push through graph -> encode Opus
for frame in in_container.decode(in_stream):
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush remaining frames
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
for packet in out_stream.encode(None):
out_container.mux(packet)
@worker_task(task_definition_name="pad_track")
def pad_track(task: Task) -> TaskResult:
"""Pad audio track with silence for alignment.
Input:
track_index: int - Index of the track
s3_key: str - S3 key of the source audio file
bucket_name: str - S3 bucket name
transcript_id: str - Transcript ID for storage path
Output:
padded_url: str - Presigned URL of padded track
size: int - File size in bytes
track_index: int - Track index (echoed back)
"""
track_index = task.input_data.get("track_index", 0)
s3_key = task.input_data.get("s3_key")
bucket_name = task.input_data.get("bucket_name")
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] pad_track",
track_index=track_index,
s3_key=s3_key,
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "pad_track", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not s3_key or not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing s3_key or transcript_id"
return task_result
import asyncio
async def _process():
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
# Get presigned URL for source file
source_url = await storage.get_file_url(
s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket_name,
)
# Open container and extract start time
with av.open(source_url) as in_container:
start_time_seconds = _extract_stream_start_time_from_container(
in_container, track_index
)
# If no padding needed, return original URL
if start_time_seconds <= 0:
logger.info(
f"Track {track_index} requires no padding",
track_index=track_index,
)
return {
"padded_url": source_url,
"size": 0,
"track_index": track_index,
}
# Create temp file for padded output
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
temp_path = temp_file.name
try:
_apply_audio_padding_to_file(
in_container, temp_path, start_time_seconds, track_index
)
# Get file size
file_size = Path(temp_path).stat().st_size
# Upload using storage layer (use separate path in shadow mode to avoid conflicts)
storage_path = f"file_pipeline_conductor/{transcript_id}/tracks/padded_{track_index}.webm"
logger.info(
f"About to upload padded track",
key=storage_path,
size=file_size,
)
with open(temp_path, "rb") as padded_file:
upload_result = await storage.put_file(storage_path, padded_file)
logger.info(
f"storage.put_file returned",
result=str(upload_result),
)
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
# Get presigned URL for padded file
padded_url = await storage.get_file_url(
storage_path,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
return {
"padded_url": padded_url,
"size": file_size,
"track_index": track_index,
}
try:
result = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = result
logger.info(
"[Worker] pad_track complete",
track_index=track_index,
padded_url=result["padded_url"][:50] + "...",
)
if transcript_id:
emit_progress(
transcript_id, "pad_track", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] pad_track failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "pad_track", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,101 +0,0 @@
"""Conductor worker: post_zulip - Post or update Zulip message with transcript summary."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
from reflector.settings import settings
@worker_task(task_definition_name="post_zulip")
def post_zulip(task: Task) -> TaskResult:
"""Post or update a Zulip message with the transcript summary.
Input:
transcript_id: str - Transcript ID
Output:
message_id: str | None - Zulip message ID
"""
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] post_zulip", transcript_id=transcript_id)
if transcript_id:
emit_progress(
transcript_id, "post_zulip", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing transcript_id"
return task_result
# Check if Zulip is configured
if not settings.ZULIP_REALM or not settings.ZULIP_API_KEY:
logger.info("[Worker] post_zulip: Zulip not configured, skipping")
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"message_id": None}
return task_result
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.transcripts import transcripts_controller
from reflector.settings import settings as app_settings
from reflector.zulip import post_transcript_to_zulip
# Create fresh database connection for subprocess (not shared from parent)
_database_context.set(None)
db = databases.Database(app_settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript is None:
raise ValueError(f"Transcript {transcript_id} not found in database")
message_id = await post_transcript_to_zulip(transcript)
return message_id
finally:
await db.disconnect()
_database_context.set(None)
try:
message_id = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"message_id": str(message_id) if message_id else None
}
logger.info(
"[Worker] post_zulip complete",
transcript_id=transcript_id,
message_id=message_id,
)
if transcript_id:
emit_progress(
transcript_id, "post_zulip", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] post_zulip failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "post_zulip", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,115 +0,0 @@
"""Conductor worker: send_webhook - Send transcript completion webhook."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="send_webhook")
def send_webhook(task: Task) -> TaskResult:
"""Send the transcript completion webhook to the configured URL.
Input:
transcript_id: str - Transcript ID
room_id: str - Room ID
Output:
sent: bool - Whether webhook was sent
status_code: int | None - HTTP status code
"""
transcript_id = task.input_data.get("transcript_id")
room_id = task.input_data.get("room_id")
logger.info("[Worker] send_webhook", transcript_id=transcript_id, room_id=room_id)
if transcript_id:
emit_progress(
transcript_id, "send_webhook", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing transcript_id"
return task_result
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.settings import settings
from reflector.worker.webhook import send_transcript_webhook
# Create fresh database connection for subprocess (not shared from parent)
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript is None:
raise ValueError(f"Transcript {transcript_id} not found in database")
# Get room for webhook URL
room = None
if room_id:
try:
room = await rooms_controller.get_by_id(room_id)
except Exception:
pass
if not room or not room.webhook_url:
logger.info(
"[Worker] send_webhook: No webhook URL configured",
transcript_id=transcript_id,
)
return False, None
status_code = await send_transcript_webhook(transcript, room)
return True, status_code
finally:
await db.disconnect()
_database_context.set(None)
try:
sent, status_code = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"sent": sent,
"status_code": status_code,
}
logger.info(
"[Worker] send_webhook complete",
transcript_id=transcript_id,
sent=sent,
status_code=status_code,
)
if transcript_id:
emit_progress(
transcript_id, "send_webhook", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] send_webhook failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "send_webhook", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,96 +0,0 @@
"""Conductor worker: transcribe_track - Transcribe audio track using GPU service."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="transcribe_track")
def transcribe_track(task: Task) -> TaskResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper.
Input:
track_index: int - Index of the track
audio_url: str - Presigned URL of the audio file
language: str - Language code (default: "en")
transcript_id: str - Transcript ID for progress tracking
Output:
words: list[dict] - List of transcribed words with timestamps and speaker
track_index: int - Track index (echoed back)
"""
track_index = task.input_data.get("track_index", 0)
audio_url = task.input_data.get("audio_url")
language = task.input_data.get("language", "en")
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] transcribe_track", track_index=track_index, language=language)
if transcript_id:
emit_progress(
transcript_id, "transcribe_track", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not audio_url:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing audio_url"
return task_result
import asyncio
async def _process():
from reflector.pipelines.transcription_helpers import (
transcribe_file_with_processor,
)
transcript = await transcribe_file_with_processor(audio_url, language)
# Tag all words with speaker index
words = []
for word in transcript.words:
word_dict = word.model_dump()
word_dict["speaker"] = track_index
words.append(word_dict)
return words
try:
words = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"words": words,
"track_index": track_index,
}
logger.info(
"[Worker] transcribe_track complete",
track_index=track_index,
word_count=len(words),
)
if transcript_id:
emit_progress(
transcript_id,
"transcribe_track",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] transcribe_track failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "transcribe_track", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -1,205 +0,0 @@
{
"name": "diarization_pipeline",
"description": "Reflector multitrack diarization pipeline",
"version": 1,
"schemaVersion": 2,
"inputParameters": [
"recording_id",
"room_name",
"tracks",
"bucket_name",
"transcript_id",
"room_id"
],
"tasks": [
{
"name": "get_recording",
"taskReferenceName": "get_recording",
"type": "SIMPLE",
"inputParameters": {
"recording_id": "${workflow.input.recording_id}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "get_participants",
"taskReferenceName": "get_participants",
"type": "SIMPLE",
"inputParameters": {
"mtg_session_id": "${get_recording.output.mtg_session_id}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "generate_dynamic_fork_tasks",
"taskReferenceName": "generate_padding_tasks",
"type": "SIMPLE",
"inputParameters": {
"tracks": "${workflow.input.tracks}",
"task_type": "pad_track",
"transcript_id": "${workflow.input.transcript_id}",
"bucket_name": "${workflow.input.bucket_name}"
}
},
{
"name": "fork_track_padding",
"taskReferenceName": "fork_track_padding",
"type": "FORK_JOIN_DYNAMIC",
"inputParameters": {
"dynamicTasks": "${generate_padding_tasks.output.tasks}",
"dynamicTasksInput": "${generate_padding_tasks.output.inputs}"
},
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
},
{
"name": "join_padding",
"taskReferenceName": "join_padding",
"type": "JOIN"
},
{
"name": "mixdown_tracks",
"taskReferenceName": "mixdown_tracks",
"type": "SIMPLE",
"inputParameters": {
"padded_urls": "${join_padding.output..padded_url}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "generate_waveform",
"taskReferenceName": "generate_waveform",
"type": "SIMPLE",
"inputParameters": {
"audio_key": "${mixdown_tracks.output.audio_key}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "generate_dynamic_fork_tasks",
"taskReferenceName": "generate_transcription_tasks",
"type": "SIMPLE",
"inputParameters": {
"tracks": "${workflow.input.tracks}",
"task_type": "transcribe_track",
"transcript_id": "${workflow.input.transcript_id}",
"padded_urls": "${join_padding.output}"
}
},
{
"name": "fork_transcription",
"taskReferenceName": "fork_transcription",
"type": "FORK_JOIN_DYNAMIC",
"inputParameters": {
"dynamicTasks": "${generate_transcription_tasks.output.tasks}",
"dynamicTasksInput": "${generate_transcription_tasks.output.inputs}"
},
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
},
{
"name": "join_transcription",
"taskReferenceName": "join_transcription",
"type": "JOIN"
},
{
"name": "merge_transcripts",
"taskReferenceName": "merge_transcripts",
"type": "SIMPLE",
"inputParameters": {
"transcripts": "${join_transcription.output}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "detect_topics",
"taskReferenceName": "detect_topics",
"type": "SIMPLE",
"inputParameters": {
"words": "${merge_transcripts.output.all_words}",
"transcript_id": "${workflow.input.transcript_id}",
"target_language": "en"
}
},
{
"name": "fork_generation",
"taskReferenceName": "fork_generation",
"type": "FORK_JOIN",
"forkTasks": [
[
{
"name": "generate_title",
"taskReferenceName": "generate_title",
"type": "SIMPLE",
"inputParameters": {
"topics": "${detect_topics.output.topics}",
"transcript_id": "${workflow.input.transcript_id}"
}
}
],
[
{
"name": "generate_summary",
"taskReferenceName": "generate_summary",
"type": "SIMPLE",
"inputParameters": {
"words": "${merge_transcripts.output.all_words}",
"topics": "${detect_topics.output.topics}",
"transcript_id": "${workflow.input.transcript_id}"
}
}
]
]
},
{
"name": "join_generation",
"taskReferenceName": "join_generation",
"type": "JOIN",
"joinOn": ["generate_title", "generate_summary"]
},
{
"name": "finalize",
"taskReferenceName": "finalize",
"type": "SIMPLE",
"inputParameters": {
"transcript_id": "${workflow.input.transcript_id}",
"title": "${generate_title.output.title}",
"summary": "${generate_summary.output.summary}",
"short_summary": "${generate_summary.output.short_summary}",
"duration": "${mixdown_tracks.output.duration}"
}
},
{
"name": "cleanup_consent",
"taskReferenceName": "cleanup_consent",
"type": "SIMPLE",
"inputParameters": {
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "post_zulip",
"taskReferenceName": "post_zulip",
"type": "SIMPLE",
"inputParameters": {
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "send_webhook",
"taskReferenceName": "send_webhook",
"type": "SIMPLE",
"inputParameters": {
"transcript_id": "${workflow.input.transcript_id}",
"room_id": "${workflow.input.room_id}"
}
}
],
"outputParameters": {
"transcript_id": "${workflow.input.transcript_id}",
"title": "${generate_title.output.title}",
"summary": "${generate_summary.output.summary}",
"duration": "${mixdown_tracks.output.duration}",
"word_count": "${merge_transcripts.output.word_count}"
}
}

View File

@@ -1,74 +0,0 @@
"""Register workflow definition with Conductor server."""
import json
from pathlib import Path
import httpx
from reflector.logger import logger
from reflector.settings import settings
def register_workflow() -> None:
"""Register the diarization pipeline workflow with Conductor server.
Raises:
httpx.HTTPStatusError: If registration fails.
"""
workflow_path = Path(__file__).parent / "diarization_pipeline.json"
with open(workflow_path) as f:
workflow = json.load(f)
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/")
url = f"{base_url}/metadata/workflow"
logger.info(
"Registering workflow",
name=workflow["name"],
version=workflow["version"],
url=url,
)
with httpx.Client(timeout=30.0) as client:
resp = client.put(
url,
json=[workflow],
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info("Workflow registered successfully", name=workflow["name"])
async def register_workflow_async() -> None:
"""Async version of register_workflow."""
workflow_path = Path(__file__).parent / "diarization_pipeline.json"
with open(workflow_path) as f:
workflow = json.load(f)
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/")
url = f"{base_url}/metadata/workflow"
logger.info(
"Registering workflow",
name=workflow["name"],
version=workflow["version"],
url=url,
)
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.put(
url,
json=[workflow],
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info("Workflow registered successfully", name=workflow["name"])
if __name__ == "__main__":
register_workflow()
print("Workflow registration complete!")

View File

@@ -22,9 +22,7 @@ recordings = sa.Table(
),
sa.Column("meeting_id", sa.String),
sa.Column("track_keys", sa.JSON, nullable=True),
sa.Column("workflow_id", sa.String, nullable=True),
sa.Index("idx_recording_meeting_id", "meeting_id"),
sa.Index("idx_recording_workflow_id", "workflow_id"),
)
@@ -40,8 +38,6 @@ class Recording(BaseModel):
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None
# Conductor workflow ID for tracking pipeline execution
workflow_id: str | None = None
@property
def is_multitrack(self) -> bool:

View File

@@ -12,7 +12,6 @@ from typing import Literal, Union, assert_never
import celery
from celery.result import AsyncResult
from reflector.conductor.client import ConductorClientManager
from reflector.db.recordings import recordings_controller
from reflector.db.transcripts import Transcript
from reflector.hatchet.client import HatchetClientManager
@@ -263,26 +262,6 @@ def dispatch_transcript_processing(
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
durable_started = True
elif settings.CONDUCTOR_ENABLED:
workflow_id = ConductorClientManager.start_workflow(
name="diarization_pipeline",
version=1,
input_data={
"recording_id": config.recording_id,
"room_name": None, # Not available in reprocess path
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
)
logger.info(
"Started Conductor workflow (reprocess)",
workflow_id=workflow_id,
transcript_id=config.transcript_id,
)
durable_started = True
# If durable workflow started and not in shadow mode, skip Celery
if durable_started and not settings.DURABLE_WORKFLOW_SHADOW_MODE:
return None

View File

@@ -151,33 +151,19 @@ class Settings(BaseSettings):
ZULIP_BOT_EMAIL: str | None = None
# Durable workflow orchestration
# Provider: "hatchet" or "conductor" (or "none" to disable)
# Provider: "hatchet" (or "none" to disable)
DURABLE_WORKFLOW_PROVIDER: str = "none"
DURABLE_WORKFLOW_SHADOW_MODE: bool = False # Run both provider + Celery
# Conductor workflow orchestration
CONDUCTOR_SERVER_URL: str = "http://conductor:8080/api"
CONDUCTOR_DEBUG: bool = False
# Hatchet workflow orchestration
HATCHET_CLIENT_TOKEN: str | None = None
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
HATCHET_DEBUG: bool = False
@property
def CONDUCTOR_ENABLED(self) -> bool:
"""Legacy compatibility: True if Conductor is the active provider."""
return self.DURABLE_WORKFLOW_PROVIDER == "conductor"
@property
def HATCHET_ENABLED(self) -> bool:
"""True if Hatchet is the active provider."""
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
@property
def CONDUCTOR_SHADOW_MODE(self) -> bool:
"""Legacy compatibility for shadow mode."""
return self.DURABLE_WORKFLOW_SHADOW_MODE and self.CONDUCTOR_ENABLED
settings = Settings()

View File

@@ -1,45 +0,0 @@
"""Conductor health and status endpoints."""
import httpx
from fastapi import APIRouter
from reflector.settings import settings
router = APIRouter(prefix="/conductor", tags=["conductor"])
@router.get("/health")
async def conductor_health():
"""Check Conductor server connectivity and status."""
if not settings.CONDUCTOR_ENABLED:
return {"status": "disabled", "connected": False}
# Extract base URL (remove /api suffix for health check)
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/api").rstrip("/")
health_url = f"{base_url}/health"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(health_url)
if resp.status_code == 200:
return {"status": "healthy", "connected": True}
else:
return {
"status": "unhealthy",
"connected": True,
"error": f"Health check returned {resp.status_code}",
}
except httpx.TimeoutException:
return {
"status": "unhealthy",
"connected": False,
"error": "Connection timeout",
}
except httpx.ConnectError as e:
return {
"status": "unhealthy",
"connected": False,
"error": f"Connection failed: {e}",
}
except Exception as e:
return {"status": "unhealthy", "connected": False, "error": str(e)}

View File

@@ -286,7 +286,7 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
# Start durable workflow if enabled (Hatchet or Conductor)
# Start durable workflow if enabled (Hatchet)
durable_started = False
if settings.HATCHET_ENABLED:
@@ -309,33 +309,10 @@ async def _process_multitrack_recording_inner(
transcript_id=transcript.id,
)
# Store workflow_id on recording for status tracking
await recordings_controller.update(recording, {"workflow_id": workflow_id})
durable_started = True
elif settings.CONDUCTOR_ENABLED:
from reflector.conductor.client import ConductorClientManager # noqa: PLC0415
workflow_id = ConductorClientManager.start_workflow(
name="diarization_pipeline",
version=1,
input_data={
"recording_id": recording_id,
"room_name": daily_room_name,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
# Store workflow_run_id on transcript for replay/resume
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Started Conductor workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
# Store workflow_id on recording for status tracking
await recordings_controller.update(recording, {"workflow_id": workflow_id})
durable_started = True
# If durable workflow started and not in shadow mode, skip Celery