diff --git a/server/migration.md b/server/migration.md new file mode 100644 index 00000000..931e72d6 --- /dev/null +++ b/server/migration.md @@ -0,0 +1,583 @@ +# Celery to TaskIQ Migration Guide + +## Executive Summary + +This document outlines the migration path from Celery to TaskIQ for the Reflector project. TaskIQ is a modern, async-first distributed task queue that provides similar functionality to Celery while being designed specifically for async Python applications. + +## Current Celery Usage Analysis + +### Key Patterns in Use +1. **Task Decorators**: `@shared_task`, `@asynctask`, `@with_session` decorators +2. **Task Invocation**: `.delay()`, `.si()` for signatures +3. **Workflow Patterns**: `chain()`, `group()`, `chord()` for complex pipelines +4. **Scheduled Tasks**: Celery Beat with crontab and periodic schedules +5. **Session Management**: Custom `@with_session` and `@with_session_and_transcript` decorators +6. **Retry Logic**: Auto-retry with exponential backoff +7. **Redis Backend**: Using Redis for broker and result backend + +### Critical Files to Migrate +- `reflector/worker/app.py` - Celery app configuration and beat schedule +- `reflector/worker/session_decorator.py` - Session management decorators +- `reflector/pipelines/main_file_pipeline.py` - File processing pipeline +- `reflector/pipelines/main_live_pipeline.py` - Live streaming pipeline (10 tasks) +- `reflector/worker/process.py` - Background processing tasks +- `reflector/worker/ics_sync.py` - Calendar sync tasks +- `reflector/worker/cleanup.py` - Cleanup tasks +- `reflector/worker/webhook.py` - Webhook notifications + +## TaskIQ Architecture Mapping + +### 1. Installation + +```bash +# Remove Celery dependencies +uv remove celery flower + +# Install TaskIQ with Redis support +uv add taskiq taskiq-redis taskiq-pipelines +``` + +### 2. Broker Configuration + +#### Current (Celery) +```python +# reflector/worker/app.py +from celery import Celery + +app = Celery( + "reflector", + broker=settings.CELERY_BROKER_URL, + backend=settings.CELERY_RESULT_BACKEND, + include=[...], +) +``` + +#### New (TaskIQ) +```python +# reflector/worker/broker.py +from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker +from taskiq import PipelineMiddleware, SimpleRetryMiddleware + +result_backend = RedisAsyncResultBackend( + redis_url=settings.REDIS_URL, + result_ex_time=86400, # 24 hours +) + +broker = RedisStreamBroker( + url=settings.REDIS_URL, + max_connection_pool_size=10, +).with_result_backend(result_backend).with_middlewares( + PipelineMiddleware(), # For chain/group/chord support + SimpleRetryMiddleware(default_retry_count=3), +) + +# For testing environment +if os.environ.get("ENVIRONMENT") == "pytest": + from taskiq import InMemoryBroker + broker = InMemoryBroker(await_inplace=True) +``` + +### 3. Task Definition Migration + +#### Current (Celery) +```python +@shared_task +@asynctask +@with_session +async def task_pipeline_file_process(session: AsyncSession, transcript_id: str): + pipeline = PipelineMainFile(transcript_id=transcript_id) + await pipeline.process() +``` + +#### New (TaskIQ) +```python +from taskiq import TaskiqDepends +from reflector.worker.broker import broker +from reflector.worker.dependencies import get_db_session + +@broker.task +async def task_pipeline_file_process(transcript_id: str): + # Use get_session for proper test mocking + async for session in get_session(): + pipeline = PipelineMainFile(transcript_id=transcript_id) + await pipeline.process() +``` + +### 4. Session Management + +#### Current Session Decorators (Keep Using These!) +```python +# reflector/worker/session_decorator.py +def with_session(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + async with get_session_context() as session: + return await func(session, *args, **kwargs) + return wrapper +``` + +#### Session Management Strategy + +**⚠️ CRITICAL**: The key insight is to maintain consistent session management patterns: + +1. **For Worker Tasks**: Continue using `@with_session` decorator pattern +2. **For FastAPI endpoints**: Use `get_session` dependency injection +3. **Never use `get_session_factory()` directly** in application code + +```python +# APPROACH 1: Simple migration keeping decorator pattern +from reflector.worker.session_decorator import with_session + +@taskiq_broker.task +@with_session +async def task_pipeline_file_process(session, *, transcript_id: str): + # Session is provided by decorator, just like Celery version + transcript = await transcripts_controller.get_by_id(session, transcript_id) + pipeline = PipelineMainFile(transcript_id=transcript_id) + await pipeline.process() + +# APPROACH 2: For test compatibility without decorator +from reflector.db import get_session + +@taskiq_broker.task +async def task_pipeline_file_process(transcript_id: str): + # Use get_session which is mocked in tests + async for session in get_session(): + transcript = await transcripts_controller.get_by_id(session, transcript_id) + pipeline = PipelineMainFile(transcript_id=transcript_id) + await pipeline.process() + +# APPROACH 3: Future - TaskIQ dependency injection (after full migration) +from taskiq import TaskiqDepends + +async def get_session_context(): + """Context manager version of get_session for consistency""" + async for session in get_session(): + yield session + +@taskiq_broker.task +async def task_pipeline_file_process( + transcript_id: str, + session: AsyncSession = TaskiqDepends(get_session_context) +): + transcript = await transcripts_controller.get_by_id(session, transcript_id) + pipeline = PipelineMainFile(transcript_id=transcript_id) + await pipeline.process() +``` + +**Key Points:** +- `@with_session` decorator works with TaskIQ tasks (remove `@asynctask`, keep `@with_session`) +- For testing: `get_session()` from `reflector.db` is properly mocked +- Never call `get_session_factory()` directly - always use the abstractions + +### 5. Task Invocation + +#### Current (Celery) +```python +# Simple async execution +task_pipeline_file_process.delay(transcript_id=transcript.id) + +# With signature for chaining +task_cleanup_consent.si(transcript_id=transcript_id) +``` + +#### New (TaskIQ) +```python +# Simple async execution +await task_pipeline_file_process.kiq(transcript_id=transcript.id) + +# With kicker for advanced configuration +await task_cleanup_consent.kicker().with_labels( + priority="high" +).kiq(transcript_id=transcript_id) +``` + +### 6. Workflow Patterns (Chain, Group, Chord) + +#### Current (Celery) +```python +from celery import chain, group, chord + +# Chain example +post_chain = chain( + task_cleanup_consent.si(transcript_id=transcript_id), + task_pipeline_post_to_zulip.si(transcript_id=transcript_id), + task_send_webhook_if_needed.si(transcript_id=transcript_id), +) + +# Chord example (parallel + callback) +chain = chord( + group(chain_mp3_and_diarize, chain_title_preview), + chain_final_summaries, +) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id) +``` + +#### New (TaskIQ with Pipelines) +```python +from taskiq_pipelines import Pipeline +from taskiq import gather + +# Chain example using Pipeline +post_pipeline = ( + Pipeline(broker, task_cleanup_consent) + .call_next(task_pipeline_post_to_zulip, transcript_id=transcript_id) + .call_next(task_send_webhook_if_needed, transcript_id=transcript_id) +) +await post_pipeline.kiq(transcript_id=transcript_id) + +# Parallel execution with gather +results = await gather([ + chain_mp3_and_diarize.kiq(transcript_id), + chain_title_preview.kiq(transcript_id), +]) + +# Then execute callback +await chain_final_summaries.kiq(transcript_id, results) +await task_pipeline_post_to_zulip.kiq(transcript_id) +``` + +### 7. Scheduled Tasks (Celery Beat → TaskIQ Scheduler) + +#### Current (Celery Beat) +```python +# reflector/worker/app.py +app.conf.beat_schedule = { + "process_messages": { + "task": "reflector.worker.process.process_messages", + "schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS), + }, + "reprocess_failed_recordings": { + "task": "reflector.worker.process.reprocess_failed_recordings", + "schedule": crontab(hour=5, minute=0), + }, +} +``` + +#### New (TaskIQ Scheduler) +```python +# reflector/worker/scheduler.py +from taskiq import TaskiqScheduler +from taskiq_redis import ListRedisScheduleSource + +schedule_source = ListRedisScheduleSource(settings.REDIS_URL) + +# Define scheduled tasks with decorators +@broker.task( + schedule=[ + { + "cron": f"*/{int(settings.SQS_POLLING_TIMEOUT_SECONDS)} * * * * *" + } + ] +) +async def process_messages(): + # Task implementation + pass + +@broker.task( + schedule=[{"cron": "0 5 * * *"}] # Daily at 5 AM +) +async def reprocess_failed_recordings(): + # Task implementation + pass + +# Initialize scheduler +scheduler = TaskiqScheduler(broker, sources=[schedule_source]) + +# Run scheduler (separate process) +# taskiq scheduler reflector.worker.scheduler:scheduler +``` + +### 8. Retry Configuration + +#### Current (Celery) +```python +@shared_task( + bind=True, + max_retries=30, + default_retry_delay=60, + retry_backoff=True, + retry_backoff_max=3600, +) +async def task_send_webhook_if_needed(self, ...): + try: + # Task logic + except Exception as exc: + raise self.retry(exc=exc) +``` + +#### New (TaskIQ) +```python +from taskiq.middlewares import SimpleRetryMiddleware + +# Global middleware configuration (1:1 with Celery defaults) +broker = broker.with_middlewares( + SimpleRetryMiddleware(default_retry_count=3), +) + +# For specific tasks with custom retry logic: +@broker.task(retry_on_error=True, max_retries=30) +async def task_send_webhook_if_needed(...): + # Task logic - exceptions auto-retry + pass +``` + +## Testing Migration + +### Current Pytest Setup (Celery) +```python +# tests/conftest.py +@pytest.fixture(scope="session") +def celery_config(): + return { + "broker_url": "memory://", + "result_backend": "cache+memory://", + } + +@pytest.mark.usefixtures("celery_session_app") +@pytest.mark.usefixtures("celery_session_worker") +async def test_task(): + pass +``` + +### New Pytest Setup (TaskIQ) +```python +# tests/conftest.py +import pytest +from taskiq import InMemoryBroker +from reflector.worker.broker import broker + +@pytest.fixture(scope="function", autouse=True) +async def setup_taskiq_broker(): + """Replace broker with InMemoryBroker for testing""" + original_broker = broker + test_broker = InMemoryBroker(await_inplace=True) + + # Copy task registrations + for task_name, task in original_broker._tasks.items(): + test_broker.register_task(task.original_function, task_name=task_name) + + yield test_broker + await test_broker.shutdown() + +@pytest.fixture +async def taskiq_with_db_session(db_session): + """Setup TaskIQ with database session""" + from reflector.worker.broker import broker + broker.add_dependency_context({ + AsyncSession: db_session + }) + yield + broker.custom_dependency_context = {} + +# Test example +@pytest.mark.anyio +async def test_task(taskiq_with_db_session): + result = await task_pipeline_file_process("transcript-id") + assert result is not None +``` + +## Migration Steps + +### Phase 1: Setup (Week 1) +1. **Install TaskIQ packages** + ```bash + uv add taskiq taskiq-redis taskiq-pipelines + ``` + +2. **Create new broker configuration** + - Create `reflector/worker/broker.py` with TaskIQ broker setup + - Create `reflector/worker/dependencies.py` for dependency injection + +3. **Update settings** + - Keep existing Redis configuration + - Add TaskIQ-specific settings if needed + +### Phase 2: Parallel Running (Week 2-3) +1. **Migrate simple tasks first** + - Start with `cleanup.py` (1 task) + - Move to `webhook.py` (1 task) + - Test thoroughly in isolation + +2. **Setup dual-mode operation** + - Keep Celery tasks running + - Add TaskIQ versions alongside + - Use feature flags to switch between them + +### Phase 3: Complex Tasks (Week 3-4) +1. **Migrate pipeline tasks** + - Convert `main_file_pipeline.py` + - Convert `main_live_pipeline.py` (most complex with 10 tasks) + - Ensure chain/group/chord patterns work + +2. **Migrate scheduled tasks** + - Setup TaskIQ scheduler + - Convert beat schedule to TaskIQ schedules + - Test cron patterns + +### Phase 4: Testing & Validation (Week 4-5) +1. **Update test suite** + - Replace Celery fixtures with TaskIQ fixtures + - Update all test files + - Ensure coverage remains the same + +2. **Performance testing** + - Compare task execution times + - Monitor Redis memory usage + - Test under load + +### Phase 5: Cutover (Week 5-6) +1. **Final migration** + - Remove Celery dependencies + - Update deployment scripts + - Update documentation + +2. **Monitoring** + - Setup TaskIQ monitoring (if available) + - Create health checks + - Document operational procedures + +## Key Differences to Note + +### Advantages of TaskIQ +1. **Native async support** - No need for `@asynctask` wrapper +2. **Dependency injection** - Cleaner than decorators for session management +3. **Type hints** - Better IDE support and autocompletion +4. **Modern Python** - Designed for Python 3.7+ +5. **Simpler testing** - InMemoryBroker makes testing easier + +### Potential Challenges +1. **Less mature ecosystem** - Fewer third-party integrations +2. **Documentation** - Less comprehensive than Celery +3. **Monitoring tools** - No Flower equivalent (may need custom solution) +4. **Community support** - Smaller community than Celery + +## Command Line Changes + +### Current (Celery) +```bash +# Start worker +celery -A reflector.worker.app worker --loglevel=info + +# Start beat scheduler +celery -A reflector.worker.app beat +``` + +### New (TaskIQ) +```bash +# Start worker +taskiq worker reflector.worker.broker:broker + +# Start scheduler +taskiq scheduler reflector.worker.scheduler:scheduler + +# With custom settings +taskiq worker reflector.worker.broker:broker --workers 4 --log-level INFO +``` + +## Rollback Plan + +If issues arise during migration: + +1. **Keep Celery code in version control** - Tag the last Celery version +2. **Maintain dual broker setup** - Can switch back via environment variable +3. **Database compatibility** - No schema changes required +4. **Redis compatibility** - Both use Redis, easy to switch back + +## Success Criteria + +1. ✅ All tasks migrated and functioning +2. ✅ Test coverage maintained at current levels +3. ✅ Performance equal or better than Celery +4. ✅ Scheduled tasks running reliably +5. ✅ Error handling and retries working correctly +6. ✅ WebSocket notifications still functioning +7. ✅ Pipeline processing maintaining same behavior + +## Monitoring & Operations + +### Health Checks +```python +# reflector/worker/healthcheck.py +@broker.task +async def healthcheck_ping(): + """TaskIQ health check task""" + return {"status": "healthy", "timestamp": datetime.now()} +``` + +### Metrics Collection +- Task execution times +- Success/failure rates +- Queue depths +- Worker utilization + +## Key Implementation Points - MUST READ + +### Critical Changes Required + +1. **Session Management in Tasks** + - ✅ **VERIFIED**: Tasks MUST use `get_session()` from `reflector.db` for test compatibility + - ❌ Do NOT use `get_session_factory()` directly in tasks - it bypasses test mocks + - ✅ The test database session IS properly shared when using `get_session()` + +2. **Task Invocation Changes** + - Replace `.delay()` with `await .kiq()` + - All task invocations become async/await + - No need to commit sessions before task invocation (controllers handle this) + +3. **Broker Configuration** + - TaskIQ broker must be initialized in `worker/app.py` + - Use `InMemoryBroker(await_inplace=True)` for testing + - Use `RedisStreamBroker` for production + +4. **Test Setup Requirements** + - Set `os.environ["ENVIRONMENT"] = "pytest"` at top of test files + - Add TaskIQ broker fixture to test functions + - Keep Celery fixtures for now (dual-mode operation) + +5. **Import Pattern Changes** + ```python + # Each file needs both imports during migration + from reflector.pipelines.main_file_pipeline import ( + task_pipeline_file_process, # Celery version + task_pipeline_file_process_taskiq, # TaskIQ version + ) + ``` + +6. **Decorator Changes** + - Remove `@asynctask` - TaskIQ is async-native + - **Keep `@with_session`** - it works with TaskIQ tasks! + - Remove `@shared_task` from TaskIQ version + - Keep `@shared_task` on Celery version for backward compatibility + +## Verified POC Results + +✅ **Database transactions work correctly** across test and TaskIQ tasks +✅ **Tasks execute immediately** in tests with `InMemoryBroker(await_inplace=True)` +✅ **Session mocking works** when using `get_session()` properly +✅ **"OK" output confirmed** - TaskIQ task executes and accesses test data + +## Conclusion + +The migration from Celery to TaskIQ is feasible and offers several advantages for an async-first codebase like Reflector. The key challenges will be: + +1. Migrating complex pipeline patterns (chain/chord) +2. Ensuring scheduled task reliability +3. **SOLVED**: Maintaining session management patterns - use `get_session()` +4. Updating the test suite + +The phased approach allows for gradual migration with minimal risk. The ability to run both systems in parallel provides a safety net during the transition period. + +## Appendix: Quick Reference + +| Celery | TaskIQ | +|--------|--------| +| `@shared_task` | `@broker.task` | +| `.delay()` | `.kiq()` | +| `.apply_async()` | `.kicker().kiq()` | +| `chain()` | `Pipeline()` | +| `group()` | `gather()` | +| `chord()` | `gather() + callback` | +| `@task.retry()` | `retry_on_error=True` | +| Celery Beat | TaskIQ Scheduler | +| `celery worker` | `taskiq worker` | +| Flower | Custom monitoring needed | \ No newline at end of file diff --git a/server/migrations/versions/0ab2d7ffaa16_add_long_summary_to_search_vector.py b/server/migrations/versions/0ab2d7ffaa16_add_long_summary_to_search_vector.py index 990f5932..a6a80791 100644 --- a/server/migrations/versions/0ab2d7ffaa16_add_long_summary_to_search_vector.py +++ b/server/migrations/versions/0ab2d7ffaa16_add_long_summary_to_search_vector.py @@ -23,14 +23,16 @@ def upgrade() -> None: op.drop_column("transcript", "search_vector_en") # Recreate the search vector column with long_summary included - op.execute(""" + op.execute( + """ ALTER TABLE transcript ADD COLUMN search_vector_en tsvector GENERATED ALWAYS AS ( setweight(to_tsvector('english', coalesce(title, '')), 'A') || setweight(to_tsvector('english', coalesce(long_summary, '')), 'B') || setweight(to_tsvector('english', coalesce(webvtt, '')), 'C') ) STORED - """) + """ + ) # Recreate the GIN index for the search vector op.create_index( @@ -47,13 +49,15 @@ def downgrade() -> None: op.drop_column("transcript", "search_vector_en") # Recreate the original search vector column without long_summary - op.execute(""" + op.execute( + """ ALTER TABLE transcript ADD COLUMN search_vector_en tsvector GENERATED ALWAYS AS ( setweight(to_tsvector('english', coalesce(title, '')), 'A') || setweight(to_tsvector('english', coalesce(webvtt, '')), 'B') ) STORED - """) + """ + ) # Recreate the GIN index for the search vector op.create_index( diff --git a/server/migrations/versions/116b2f287eab_add_full_text_search.py b/server/migrations/versions/116b2f287eab_add_full_text_search.py index 98a6b9eb..91f41044 100644 --- a/server/migrations/versions/116b2f287eab_add_full_text_search.py +++ b/server/migrations/versions/116b2f287eab_add_full_text_search.py @@ -21,13 +21,15 @@ def upgrade() -> None: if conn.dialect.name != "postgresql": return - op.execute(""" + op.execute( + """ ALTER TABLE transcript ADD COLUMN search_vector_en tsvector GENERATED ALWAYS AS ( setweight(to_tsvector('english', coalesce(title, '')), 'A') || setweight(to_tsvector('english', coalesce(webvtt, '')), 'B') ) STORED - """) + """ + ) op.create_index( "idx_transcript_search_vector_en", diff --git a/server/migrations/versions/2ae3db106d4e_clean_up_orphaned_room_id_references_in_.py b/server/migrations/versions/2ae3db106d4e_clean_up_orphaned_room_id_references_in_.py index c091ab49..ba355edf 100644 --- a/server/migrations/versions/2ae3db106d4e_clean_up_orphaned_room_id_references_in_.py +++ b/server/migrations/versions/2ae3db106d4e_clean_up_orphaned_room_id_references_in_.py @@ -19,12 +19,14 @@ depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: # Set room_id to NULL for meetings that reference non-existent rooms - op.execute(""" + op.execute( + """ UPDATE meeting SET room_id = NULL WHERE room_id IS NOT NULL AND room_id NOT IN (SELECT id FROM room WHERE id IS NOT NULL) - """) + """ + ) def downgrade() -> None: diff --git a/server/migrations/versions/d7fbb74b673b_add_room_id_to_transcript.py b/server/migrations/versions/d7fbb74b673b_add_room_id_to_transcript.py index 55cedc54..08e4df6e 100644 --- a/server/migrations/versions/d7fbb74b673b_add_room_id_to_transcript.py +++ b/server/migrations/versions/d7fbb74b673b_add_room_id_to_transcript.py @@ -27,7 +27,8 @@ def upgrade() -> None: # Populate room_id for existing ROOM-type transcripts # This joins through recording -> meeting -> room to get the room_id - op.execute(""" + op.execute( + """ UPDATE transcript AS t SET room_id = r.id FROM recording rec @@ -36,11 +37,13 @@ def upgrade() -> None: WHERE t.recording_id = rec.id AND t.source_kind = 'room' AND t.room_id IS NULL - """) + """ + ) # Fix missing meeting_id for ROOM-type transcripts # The meeting_id field exists but was never populated - op.execute(""" + op.execute( + """ UPDATE transcript AS t SET meeting_id = rec.meeting_id FROM recording rec @@ -48,7 +51,8 @@ def upgrade() -> None: AND t.source_kind = 'room' AND t.meeting_id IS NULL AND rec.meeting_id IS NOT NULL - """) + """ + ) def downgrade() -> None: diff --git a/server/pyproject.toml b/server/pyproject.toml index f075bc71..d46ca311 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -26,7 +26,6 @@ dependencies = [ "prometheus-fastapi-instrumentator>=6.1.0", "sentencepiece>=0.1.99", "protobuf>=4.24.3", - "celery>=5.3.4", "redis>=5.0.1", "python-jose[cryptography]>=3.3.0", "python-multipart>=0.0.6", @@ -39,6 +38,8 @@ dependencies = [ "pytest-env>=1.1.5", "webvtt-py>=0.5.0", "icalendar>=6.0.0", + "taskiq>=0.11.18", + "taskiq-redis>=1.1.0", ] [dependency-groups] @@ -55,7 +56,6 @@ tests = [ "pytest>=7.4.0", "httpx-ws>=0.4.1", "pytest-httpx>=0.23.1", - "pytest-celery>=0.0.0", "pytest-recording>=0.13.4", "pytest-docker>=3.2.3", "asgi-lifespan>=2.1.0", diff --git a/server/reflector/app.py b/server/reflector/app.py index e1d07d20..065b55d1 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -88,8 +88,8 @@ app.include_router(zulip_router, prefix="/v1") app.include_router(whereby_router, prefix="/v1") add_pagination(app) -# prepare celery -from reflector.worker import app as celery_app # noqa +# prepare taskiq +from reflector.worker import app as taskiq_app # noqa # simpler openapi id diff --git a/server/reflector/asynctask.py b/server/reflector/asynctask.py deleted file mode 100644 index 409a04f0..00000000 --- a/server/reflector/asynctask.py +++ /dev/null @@ -1,20 +0,0 @@ -import asyncio -import functools - - -def asynctask(f): - @functools.wraps(f) - def wrapper(*args, **kwargs): - async def run_async(): - return await f(*args, **kwargs) - - coro = run_async() - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - if loop and loop.is_running(): - return loop.run_until_complete(coro) - return asyncio.run(coro) - - return wrapper diff --git a/server/reflector/db/__init__.py b/server/reflector/db/__init__.py index d482c404..8e38001d 100644 --- a/server/reflector/db/__init__.py +++ b/server/reflector/db/__init__.py @@ -1,3 +1,4 @@ +from contextlib import asynccontextmanager from typing import AsyncGenerator from sqlalchemy.ext.asyncio import ( @@ -45,6 +46,18 @@ async def _get_session() -> AsyncGenerator[AsyncSession, None]: async def get_session() -> AsyncGenerator[AsyncSession, None]: + """ + Get a database session, fastapi dependency injection style + """ + async for session in _get_session(): + yield session + + +@asynccontextmanager +async def get_session_context(): + """ + Get a database session as an async context manager + """ async for session in _get_session(): yield session diff --git a/server/reflector/db/calendar_events.py b/server/reflector/db/calendar_events.py index 889f18a0..d9176f9d 100644 --- a/server/reflector/db/calendar_events.py +++ b/server/reflector/db/calendar_events.py @@ -171,9 +171,11 @@ class CalendarEventController: .where( sa.and_( CalendarEventModel.room_id == room_id, - CalendarEventModel.ics_uid.notin_(current_ics_uids) - if current_ics_uids - else True, + ( + CalendarEventModel.ics_uid.notin_(current_ics_uids) + if current_ics_uids + else True + ), CalendarEventModel.end_time > datetime.now(timezone.utc), ) ) diff --git a/server/reflector/db/transcripts.py b/server/reflector/db/transcripts.py index e4fe43a7..c3aaa8fe 100644 --- a/server/reflector/db/transcripts.py +++ b/server/reflector/db/transcripts.py @@ -590,8 +590,11 @@ class TranscriptController: """ A context manager for database transaction """ - async with session.begin(): + if session.in_transaction(): yield + else: + async with session.begin(): + yield async def append_event( self, diff --git a/server/reflector/pipelines/main_file_pipeline.py b/server/reflector/pipelines/main_file_pipeline.py index 99ade57e..2d8dcc1c 100644 --- a/server/reflector/pipelines/main_file_pipeline.py +++ b/server/reflector/pipelines/main_file_pipeline.py @@ -12,11 +12,8 @@ from pathlib import Path import av import structlog -from celery import chain, shared_task from sqlalchemy.ext.asyncio import AsyncSession -from reflector.asynctask import asynctask -from reflector.db import get_session_factory from reflector.db.rooms import rooms_controller from reflector.db.transcripts import ( SourceKind, @@ -28,8 +25,8 @@ from reflector.logger import logger from reflector.pipelines.main_live_pipeline import ( PipelineMainBase, broadcast_to_sockets, - task_cleanup_consent, - task_pipeline_post_to_zulip, + task_cleanup_consent_taskiq, + task_pipeline_post_to_zulip_taskiq, ) from reflector.processors import ( AudioFileWriterProcessor, @@ -55,8 +52,9 @@ from reflector.processors.types import ( ) from reflector.settings import settings from reflector.storage import get_transcripts_storage -from reflector.worker.session_decorator import with_session -from reflector.worker.webhook import send_transcript_webhook +from reflector.worker.app import taskiq_broker +from reflector.worker.session_decorator import catch_exception, with_session +from reflector.worker.webhook import send_transcript_webhook_taskiq class EmptyPipeline: @@ -98,31 +96,29 @@ class PipelineMainFile(PipelineMainBase): ) @broadcast_to_sockets - async def set_status(self, transcript_id: str, status: TranscriptStatus): - async with self.lock_transaction(): - async with get_session_factory()() as session: - return await transcripts_controller.set_status( - session, transcript_id, status - ) + async def set_status( + self, + session: AsyncSession, + transcript_id: str, + status: TranscriptStatus, + ): + return await transcripts_controller.set_status(session, transcript_id, status) - async def process(self, file_path: Path): + async def process(self, session: AsyncSession, file_path: Path): """Main entry point for file processing""" self.logger.info(f"Starting file pipeline for {file_path}") - async with get_session_factory()() as session: - transcript = await transcripts_controller.get_by_id( - session, self.transcript_id - ) + transcript = await transcripts_controller.get_by_id(session, self.transcript_id) - # Clear transcript as we're going to regenerate everything - await transcripts_controller.update( - session, - transcript, - { - "events": [], - "topics": [], - }, - ) + # Clear transcript as we're going to regenerate everything + await transcripts_controller.update( + session, + transcript, + { + "events": [], + "topics": [], + }, + ) # Extract audio and write to transcript location audio_path = await self.extract_and_write_audio(file_path, transcript) @@ -141,8 +137,7 @@ class PipelineMainFile(PipelineMainBase): self.logger.info("File pipeline complete") - async with get_session_factory()() as session: - await transcripts_controller.set_status(session, transcript.id, "ended") + await transcripts_controller.set_status(session, transcript.id, "ended") async def extract_and_write_audio( self, file_path: Path, transcript: Transcript @@ -393,11 +388,9 @@ class PipelineMainFile(PipelineMainBase): await processor.flush() -@shared_task -@asynctask +@taskiq_broker.task @with_session async def task_send_webhook_if_needed(session, *, transcript_id: str): - """Send webhook if this is a room recording with webhook configured""" transcript = await transcripts_controller.get_by_id(session, transcript_id) if not transcript: return @@ -411,25 +404,23 @@ async def task_send_webhook_if_needed(session, *, transcript_id: str): room_id=room.id, webhook_url=room.webhook_url, ) - send_transcript_webhook.delay( + await send_transcript_webhook_taskiq.kiq( transcript_id, room.id, event_id=uuid.uuid4().hex ) -@shared_task -@asynctask +@taskiq_broker.task +@catch_exception @with_session -async def task_pipeline_file_process(session, *, transcript_id: str): - """Celery task for file pipeline processing""" +async def task_pipeline_file_process(session: AsyncSession, *, transcript_id: str): transcript = await transcripts_controller.get_by_id(session, transcript_id) if not transcript: raise Exception(f"Transcript {transcript_id} not found") pipeline = PipelineMainFile(transcript_id=transcript_id) try: - await pipeline.set_status(transcript_id, "processing") + await pipeline.set_status(session, transcript_id, "processing") - # Find the file to process audio_file = next(transcript.data_path.glob("upload.*"), None) if not audio_file: audio_file = next(transcript.data_path.glob("audio.*"), None) @@ -437,16 +428,17 @@ async def task_pipeline_file_process(session, *, transcript_id: str): if not audio_file: raise Exception("No audio file found to process") - await pipeline.process(audio_file) + await pipeline.process(session, audio_file) except Exception: - await pipeline.set_status(transcript_id, "error") + try: + await pipeline.set_status(session, transcript_id, "error") + except: + logger.error( + "Error setting status in task_pipeline_file_process during exception, ignoring it" + ) raise - # Run post-processing chain: consent cleanup -> zulip -> webhook - post_chain = chain( - task_cleanup_consent.si(transcript_id=transcript_id), - task_pipeline_post_to_zulip.si(transcript_id=transcript_id), - task_send_webhook_if_needed.si(transcript_id=transcript_id), - ) - post_chain.delay() + await task_cleanup_consent_taskiq.kiq(transcript_id=transcript_id) + await task_pipeline_post_to_zulip_taskiq.kiq(transcript_id=transcript_id) + await task_send_webhook_if_needed.kiq(transcript_id=transcript_id) diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index 882e8cf9..884fb457 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -18,13 +18,11 @@ from typing import Generic import av import boto3 -from celery import chord, current_task, group, shared_task from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from structlog import BoundLogger as Logger -from reflector.asynctask import asynctask -from reflector.db import get_session_factory +from reflector.db import get_session_context from reflector.db.meetings import meeting_consent_controller, meetings_controller from reflector.db.recordings import recordings_controller from reflector.db.rooms import rooms_controller @@ -64,6 +62,7 @@ from reflector.processors.types import ( from reflector.processors.types import Transcript as TranscriptProcessorType from reflector.settings import settings from reflector.storage import get_transcripts_storage +from reflector.worker.app import taskiq_broker from reflector.worker.session_decorator import with_session_and_transcript from reflector.ws_manager import WebsocketManager, get_ws_manager from reflector.zulip import ( @@ -99,21 +98,12 @@ def get_transcript(func): @functools.wraps(func) async def wrapper(**kwargs): transcript_id = kwargs.pop("transcript_id") - async with get_session_factory()() as session: + async with get_session_context() as session: transcript = await transcripts_controller.get_by_id(session, transcript_id) if not transcript: raise Exception(f"Transcript {transcript_id} not found") - # Enhanced logger with Celery task context tlogger = logger.bind(transcript_id=transcript.id) - if current_task: - tlogger = tlogger.bind( - task_id=current_task.request.id, - task_name=current_task.name, - worker_hostname=current_task.request.hostname, - task_retries=current_task.request.retries, - transcript_id=transcript_id, - ) try: result = await func(transcript=transcript, logger=tlogger, **kwargs) @@ -177,8 +167,13 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage] @asynccontextmanager async def transaction(self): async with self.lock_transaction(): - async with get_session_factory()() as session: - yield session + async with get_session_context() as session: + print(">>> SESSION USING", session, session.in_transaction()) + if session.in_transaction(): + yield session + else: + async with session.begin(): + yield session @broadcast_to_sockets async def on_status(self, status): @@ -209,7 +204,7 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage] # when the status of the pipeline changes, update the transcript async with self._lock: - async with get_session_factory()() as session: + async with get_session_context() as session: return await transcripts_controller.set_status( session, self.transcript_id, status ) @@ -344,7 +339,7 @@ class PipelineMainLive(PipelineMainBase): async def create(self) -> Pipeline: # create a context for the whole rtc transaction # add a customised logger to the context - async with get_session_factory()() as session: + async with get_session_context() as session: transcript = await self.get_transcript(session) processors = [ @@ -393,7 +388,7 @@ class PipelineMainDiarization(PipelineMainBase[AudioDiarizationInput]): # now let's start the pipeline by pushing information to the # first processor diarization processor # XXX translation is lost when converting our data model to the processor model - async with get_session_factory()() as session: + async with get_session_context() as session: transcript = await self.get_transcript(session) # diarization works only if the file is uploaded to an external storage @@ -427,7 +422,7 @@ class PipelineMainFromTopics(PipelineMainBase[TitleSummaryWithIdProcessorType]): async def create(self) -> Pipeline: # get transcript - async with get_session_factory()() as session: + async with get_session_context() as session: self._transcript = transcript = await self.get_transcript(session) # create pipeline @@ -707,26 +702,22 @@ async def pipeline_post_to_zulip(session, transcript: Transcript, logger: Logger # =================================================================== -@shared_task -@asynctask +@taskiq_broker.task async def task_pipeline_remove_upload(*, transcript_id: str): await pipeline_remove_upload(transcript_id=transcript_id) -@shared_task -@asynctask +@taskiq_broker.task async def task_pipeline_waveform(*, transcript_id: str): await pipeline_waveform(transcript_id=transcript_id) -@shared_task -@asynctask +@taskiq_broker.task async def task_pipeline_convert_to_mp3(*, transcript_id: str): await pipeline_convert_to_mp3(transcript_id=transcript_id) -@shared_task -@asynctask +@taskiq_broker.task @with_session_and_transcript async def task_pipeline_upload_mp3( session, *, transcript: Transcript, logger: Logger, transcript_id: str @@ -734,26 +725,22 @@ async def task_pipeline_upload_mp3( await pipeline_upload_mp3(session, transcript=transcript, logger=logger) -@shared_task -@asynctask +@taskiq_broker.task async def task_pipeline_diarization(*, transcript_id: str): await pipeline_diarization(transcript_id=transcript_id) -@shared_task -@asynctask +@taskiq_broker.task async def task_pipeline_title(*, transcript_id: str): await pipeline_title(transcript_id=transcript_id) -@shared_task -@asynctask +@taskiq_broker.task async def task_pipeline_final_summaries(*, transcript_id: str): await pipeline_summaries(transcript_id=transcript_id) -@shared_task -@asynctask +@taskiq_broker.task @with_session_and_transcript async def task_cleanup_consent( session, *, transcript: Transcript, logger: Logger, transcript_id: str @@ -761,8 +748,7 @@ async def task_cleanup_consent( await cleanup_consent(session, transcript=transcript, logger=logger) -@shared_task -@asynctask +@taskiq_broker.task @with_session_and_transcript async def task_pipeline_post_to_zulip( session, *, transcript: Transcript, logger: Logger, transcript_id: str @@ -770,36 +756,48 @@ async def task_pipeline_post_to_zulip( await pipeline_post_to_zulip(session, transcript=transcript, logger=logger) -def pipeline_post(*, transcript_id: str): - """ - Run the post pipeline - """ - chain_mp3_and_diarize = ( - task_pipeline_waveform.si(transcript_id=transcript_id) - | task_pipeline_convert_to_mp3.si(transcript_id=transcript_id) - | task_pipeline_upload_mp3.si(transcript_id=transcript_id) - | task_pipeline_remove_upload.si(transcript_id=transcript_id) - | task_pipeline_diarization.si(transcript_id=transcript_id) - | task_cleanup_consent.si(transcript_id=transcript_id) - ) - chain_title_preview = task_pipeline_title.si(transcript_id=transcript_id) - chain_final_summaries = task_pipeline_final_summaries.si( - transcript_id=transcript_id +@taskiq_broker.task +@with_session_and_transcript +async def task_cleanup_consent_taskiq( + session, *, transcript: Transcript, logger: Logger, transcript_id: str +): + await cleanup_consent(session, transcript=transcript, logger=logger) + + +@taskiq_broker.task +@with_session_and_transcript +async def task_pipeline_post_to_zulip_taskiq( + session, *, transcript: Transcript, logger: Logger, transcript_id: str +): + await pipeline_post_to_zulip(session, transcript=transcript, logger=logger) + + +async def pipeline_post(*, transcript_id: str): + await task_pipeline_post_sequential.kiq(transcript_id=transcript_id) + + +@taskiq_broker.task +async def task_pipeline_post_sequential(*, transcript_id: str): + await task_pipeline_waveform.kiq(transcript_id=transcript_id) + await task_pipeline_convert_to_mp3.kiq(transcript_id=transcript_id) + await task_pipeline_upload_mp3.kiq(transcript_id=transcript_id) + await task_pipeline_remove_upload.kiq(transcript_id=transcript_id) + await task_pipeline_diarization.kiq(transcript_id=transcript_id) + await task_cleanup_consent.kiq(transcript_id=transcript_id) + + await asyncio.gather( + task_pipeline_title.kiq(transcript_id=transcript_id), + task_pipeline_final_summaries.kiq(transcript_id=transcript_id), ) - chain = chord( - group(chain_mp3_and_diarize, chain_title_preview), - chain_final_summaries, - ) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id) - - return chain.delay() + await task_pipeline_post_to_zulip.kiq(transcript_id=transcript_id) @get_transcript async def pipeline_process(transcript: Transcript, logger: Logger): try: if transcript.audio_location == "storage": - async with get_session_factory()() as session: + async with get_session_context() as session: await transcripts_controller.download_mp3_from_storage(transcript) transcript.audio_waveform_filename.unlink(missing_ok=True) await transcripts_controller.update( @@ -840,7 +838,7 @@ async def pipeline_process(transcript: Transcript, logger: Logger): except Exception as exc: logger.error("Pipeline error", exc_info=exc) - async with get_session_factory()() as session: + async with get_session_context() as session: await transcripts_controller.update( session, transcript, @@ -853,7 +851,6 @@ async def pipeline_process(transcript: Transcript, logger: Logger): logger.info("Pipeline ended") -@shared_task -@asynctask +@taskiq_broker.task async def task_pipeline_process(*, transcript_id: str): return await pipeline_process(transcript_id=transcript_id) diff --git a/server/reflector/services/ics_sync.py b/server/reflector/services/ics_sync.py index 1399f1b8..3e2f55dc 100644 --- a/server/reflector/services/ics_sync.py +++ b/server/reflector/services/ics_sync.py @@ -248,15 +248,21 @@ class ICSFetchService: ) att_data: AttendeeData = { "email": clean_email, - "name": att.params.get("CN") - if hasattr(att, "params") and email == email_parts[0] - else None, - "status": att.params.get("PARTSTAT") - if hasattr(att, "params") and email == email_parts[0] - else None, - "role": att.params.get("ROLE") - if hasattr(att, "params") and email == email_parts[0] - else None, + "name": ( + att.params.get("CN") + if hasattr(att, "params") and email == email_parts[0] + else None + ), + "status": ( + att.params.get("PARTSTAT") + if hasattr(att, "params") and email == email_parts[0] + else None + ), + "role": ( + att.params.get("ROLE") + if hasattr(att, "params") and email == email_parts[0] + else None + ), } final_attendees.append(att_data) else: @@ -264,9 +270,9 @@ class ICSFetchService: att_data: AttendeeData = { "email": email_str, "name": att.params.get("CN") if hasattr(att, "params") else None, - "status": att.params.get("PARTSTAT") - if hasattr(att, "params") - else None, + "status": ( + att.params.get("PARTSTAT") if hasattr(att, "params") else None + ), "role": att.params.get("ROLE") if hasattr(att, "params") else None, } final_attendees.append(att_data) @@ -281,9 +287,9 @@ class ICSFetchService: ) org_data: AttendeeData = { "email": org_email, - "name": organizer.params.get("CN") - if hasattr(organizer, "params") - else None, + "name": ( + organizer.params.get("CN") if hasattr(organizer, "params") else None + ), "role": "ORGANIZER", } final_attendees.append(org_data) diff --git a/server/reflector/tools/exportdanswer.py b/server/reflector/tools/exportdanswer.py index 2f84242a..842687ab 100644 --- a/server/reflector/tools/exportdanswer.py +++ b/server/reflector/tools/exportdanswer.py @@ -9,11 +9,10 @@ async def export_db(filename: str) -> None: filename = pathlib.Path(filename).resolve() settings.DATABASE_URL = f"sqlite:///{filename}" - from reflector.db import get_session_factory + from reflector.db import get_session_context from reflector.db.transcripts import transcripts_controller - session_factory = get_session_factory() - async with session_factory() as session: + async with get_session_context() as session: transcripts = await transcripts_controller.get_all(session) def export_transcript(transcript, output_dir): diff --git a/server/reflector/tools/exportdb.py b/server/reflector/tools/exportdb.py index 2948813c..658f4642 100644 --- a/server/reflector/tools/exportdb.py +++ b/server/reflector/tools/exportdb.py @@ -8,11 +8,10 @@ async def export_db(filename: str) -> None: filename = pathlib.Path(filename).resolve() settings.DATABASE_URL = f"sqlite:///{filename}" - from reflector.db import get_session_factory + from reflector.db import get_session_context from reflector.db.transcripts import transcripts_controller - session_factory = get_session_factory() - async with session_factory() as session: + async with get_session_context() as session: transcripts = await transcripts_controller.get_all(session) def export_transcript(transcript): diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py index e5b9ba4d..c6e6b3cf 100644 --- a/server/reflector/tools/process.py +++ b/server/reflector/tools/process.py @@ -7,13 +7,12 @@ import asyncio import json import shutil import sys -import time from pathlib import Path from typing import Any, Dict, List, Literal from sqlalchemy.ext.asyncio import AsyncSession -from reflector.db import get_session_factory +from reflector.db import get_session_context from reflector.db.transcripts import SourceKind, TranscriptTopic, transcripts_controller from reflector.logger import logger from reflector.pipelines.main_file_pipeline import ( @@ -140,13 +139,7 @@ async def process_live_pipeline( # assert documented behaviour: after process, the pipeline isn't ended. this is the reason of calling pipeline_post assert pre_final_transcript.status != "ended" - # at this point, diarization is running but we have no access to it. run diarization in parallel - one will hopefully win after polling - result = live_pipeline_post(transcript_id=transcript_id) - - # result.ready() blocks even without await; it mutates result also - while not result.ready(): - print(f"Status: {result.state}") - time.sleep(2) + await live_pipeline_post(transcript_id=transcript_id) async def process_file_pipeline( @@ -154,13 +147,7 @@ async def process_file_pipeline( ): """Process audio/video file using the optimized file pipeline""" - # task_pipeline_file_process is a Celery task, need to use .delay() for async execution - result = task_pipeline_file_process.delay(transcript_id=transcript_id) - - # Wait for the Celery task to complete - while not result.ready(): - print(f"File pipeline status: {result.state}", file=sys.stderr) - time.sleep(2) + await task_pipeline_file_process.kiq(transcript_id=transcript_id) logger.info("File pipeline processing complete") @@ -172,8 +159,7 @@ async def process( pipeline: Literal["live", "file"], output_path: str = None, ): - session_factory = get_session_factory() - async with session_factory() as session: + async with get_session_context() as session: transcript_id = await prepare_entry( session, source_path, diff --git a/server/reflector/tools/start_post_main_live_pipeline.py b/server/reflector/tools/start_post_main_live_pipeline.py index 859f03a4..57c56315 100644 --- a/server/reflector/tools/start_post_main_live_pipeline.py +++ b/server/reflector/tools/start_post_main_live_pipeline.py @@ -1,14 +1,10 @@ import argparse +import asyncio -from reflector.app import celery_app # noqa -from reflector.pipelines.main_live_pipeline import task_pipeline_main_post +from reflector.pipelines.main_live_pipeline import pipeline_post parser = argparse.ArgumentParser() parser.add_argument("transcript_id", type=str) -parser.add_argument("--delay", action="store_true") args = parser.parse_args() -if args.delay: - task_pipeline_main_post.delay(args.transcript_id) -else: - task_pipeline_main_post(args.transcript_id) +asyncio.run(pipeline_post(transcript_id=args.transcript_id)) diff --git a/server/reflector/views/transcripts_process.py b/server/reflector/views/transcripts_process.py index 5750829e..8900bb09 100644 --- a/server/reflector/views/transcripts_process.py +++ b/server/reflector/views/transcripts_process.py @@ -1,6 +1,5 @@ from typing import Annotated, Optional -import celery from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession @@ -36,24 +35,6 @@ async def transcript_process( status_code=400, detail="Recording is not ready for processing" ) - if task_is_scheduled_or_active( - "reflector.pipelines.main_file_pipeline.task_pipeline_file_process", - transcript_id=transcript_id, - ): - return ProcessStatus(status="already running") - - # schedule a background task process the file - task_pipeline_file_process.delay(transcript_id=transcript_id) + await task_pipeline_file_process.kiq(transcript_id=transcript_id) return ProcessStatus(status="ok") - - -def task_is_scheduled_or_active(task_name: str, **kwargs): - inspect = celery.current_app.control.inspect() - - for worker, tasks in (inspect.scheduled() | inspect.active()).items(): - for task in tasks: - if task["name"] == task_name and task["kwargs"] == kwargs: - return True - - return False diff --git a/server/reflector/views/transcripts_upload.py b/server/reflector/views/transcripts_upload.py index 28fd1d4e..9f1cd5ad 100644 --- a/server/reflector/views/transcripts_upload.py +++ b/server/reflector/views/transcripts_upload.py @@ -94,7 +94,6 @@ async def transcript_record_upload( # set the status to "uploaded" await transcripts_controller.update(session, transcript, {"status": "uploaded"}) - # launch a background task to process the file - task_pipeline_file_process.delay(transcript_id=transcript_id) + await task_pipeline_file_process.kiq(transcript_id=transcript_id) return UploadStatus(status="ok") diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index 3c7795a2..1ec3032e 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -1,68 +1,21 @@ -import celery +import os + import structlog -from celery import Celery -from celery.schedules import crontab +from taskiq import InMemoryBroker +from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker from reflector.settings import settings logger = structlog.get_logger(__name__) -if celery.current_app.main != "default": - logger.info(f"Celery already configured ({celery.current_app})") - app = celery.current_app + +env = os.environ.get("ENVIRONMENT") +if env and env == "pytest": + taskiq_broker = InMemoryBroker(await_inplace=True) else: - app = Celery(__name__) - app.conf.broker_url = settings.CELERY_BROKER_URL - app.conf.result_backend = settings.CELERY_RESULT_BACKEND - app.conf.broker_connection_retry_on_startup = True - app.autodiscover_tasks( - [ - "reflector.pipelines.main_live_pipeline", - "reflector.worker.healthcheck", - "reflector.worker.process", - "reflector.worker.cleanup", - "reflector.worker.ics_sync", - ] + result_backend = RedisAsyncResultBackend( + redis_url=settings.CELERY_BROKER_URL, + result_ex_time=86400, ) - - # crontab - app.conf.beat_schedule = { - "process_messages": { - "task": "reflector.worker.process.process_messages", - "schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS), - }, - "process_meetings": { - "task": "reflector.worker.process.process_meetings", - "schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS), - }, - "reprocess_failed_recordings": { - "task": "reflector.worker.process.reprocess_failed_recordings", - "schedule": crontab(hour=5, minute=0), # Midnight EST - }, - "sync_all_ics_calendars": { - "task": "reflector.worker.ics_sync.sync_all_ics_calendars", - "schedule": 60.0, # Run every minute to check which rooms need sync - }, - "create_upcoming_meetings": { - "task": "reflector.worker.ics_sync.create_upcoming_meetings", - "schedule": 30.0, # Run every 30 seconds to create upcoming meetings - }, - } - - if settings.PUBLIC_MODE: - app.conf.beat_schedule["cleanup_old_public_data"] = { - "task": "reflector.worker.cleanup.cleanup_old_public_data_task", - "schedule": crontab(hour=3, minute=0), - } - logger.info( - "Public mode cleanup enabled", - retention_days=settings.PUBLIC_DATA_RETENTION_DAYS, - ) - - if settings.HEALTHCHECK_URL: - app.conf.beat_schedule["healthcheck_ping"] = { - "task": "reflector.worker.healthcheck.healthcheck_ping", - "schedule": 60.0 * 10, - } - logger.info("Healthcheck enabled", url=settings.HEALTHCHECK_URL) - else: - logger.warning("Healthcheck disabled, no url configured") + taskiq_broker = RedisStreamBroker( + url=settings.CELERY_BROKER_URL, + ).with_result_backend(result_backend) diff --git a/server/reflector/worker/cleanup.py b/server/reflector/worker/cleanup.py index f55f3acd..8bf741af 100644 --- a/server/reflector/worker/cleanup.py +++ b/server/reflector/worker/cleanup.py @@ -9,16 +9,15 @@ from datetime import datetime, timedelta, timezone from typing import TypedDict import structlog -from celery import shared_task from pydantic.types import PositiveInt from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession -from reflector.asynctask import asynctask from reflector.db.base import MeetingModel, RecordingModel, TranscriptModel from reflector.db.transcripts import transcripts_controller from reflector.settings import settings from reflector.storage import get_recordings_storage +from reflector.worker.app import taskiq_broker from reflector.worker.session_decorator import with_session logger = structlog.get_logger(__name__) @@ -156,11 +155,7 @@ async def cleanup_old_public_data( return stats -@shared_task( - autoretry_for=(Exception,), - retry_kwargs={"max_retries": 3, "countdown": 300}, -) -@asynctask +@taskiq_broker.task @with_session async def cleanup_old_public_data_task(session: AsyncSession, days: int | None = None): await cleanup_old_public_data(session, days=days) diff --git a/server/reflector/worker/healthcheck.py b/server/reflector/worker/healthcheck.py index 6cb16f38..d5744a38 100644 --- a/server/reflector/worker/healthcheck.py +++ b/server/reflector/worker/healthcheck.py @@ -1,13 +1,13 @@ import httpx import structlog -from celery import shared_task from reflector.settings import settings +from reflector.worker.app import taskiq_broker logger = structlog.get_logger(__name__) -@shared_task +@taskiq_broker.task def healthcheck_ping(): url = settings.HEALTHCHECK_URL if not url: diff --git a/server/reflector/worker/ics_sync.py b/server/reflector/worker/ics_sync.py index a46cee01..fb21aff6 100644 --- a/server/reflector/worker/ics_sync.py +++ b/server/reflector/worker/ics_sync.py @@ -1,24 +1,21 @@ from datetime import datetime, timedelta, timezone import structlog -from celery import shared_task -from celery.utils.log import get_task_logger from sqlalchemy.ext.asyncio import AsyncSession -from reflector.asynctask import asynctask from reflector.db.calendar_events import calendar_events_controller from reflector.db.meetings import meetings_controller from reflector.db.rooms import rooms_controller from reflector.redis_cache import RedisAsyncLock from reflector.services.ics_sync import SyncStatus, ics_sync_service from reflector.whereby import create_meeting, upload_logo +from reflector.worker.app import taskiq_broker from reflector.worker.session_decorator import with_session -logger = structlog.wrap_logger(get_task_logger(__name__)) +logger = structlog.get_logger(__name__) -@shared_task -@asynctask +@taskiq_broker.task @with_session async def sync_room_ics(session: AsyncSession, room_id: str): try: @@ -56,8 +53,7 @@ async def sync_room_ics(session: AsyncSession, room_id: str): logger.error("Unexpected error during ICS sync", room_id=room_id, error=str(e)) -@shared_task -@asynctask +@taskiq_broker.task @with_session async def sync_all_ics_calendars(session: AsyncSession): try: @@ -71,7 +67,7 @@ async def sync_all_ics_calendars(session: AsyncSession): logger.debug("Skipping room, not time to sync yet", room_id=room.id) continue - sync_room_ics.delay(room.id) + await sync_room_ics.kiq(room.id) logger.info("Queued sync tasks for all eligible rooms") @@ -151,8 +147,7 @@ async def create_upcoming_meetings_for_event( ) -@shared_task -@asynctask +@taskiq_broker.task @with_session async def create_upcoming_meetings(session: AsyncSession): async with RedisAsyncLock("create_upcoming_meetings", skip_if_locked=True) as lock: diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 7284b5e8..2cae6593 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -6,8 +6,6 @@ from urllib.parse import unquote import av import boto3 import structlog -from celery import shared_task -from celery.utils.log import get_task_logger from pydantic import ValidationError from redis.exceptions import LockError from sqlalchemy.ext.asyncio import AsyncSession @@ -17,13 +15,13 @@ from reflector.db.recordings import Recording, recordings_controller from reflector.db.rooms import rooms_controller from reflector.db.transcripts import SourceKind, transcripts_controller from reflector.pipelines.main_file_pipeline import task_pipeline_file_process -from reflector.pipelines.main_live_pipeline import asynctask from reflector.redis_cache import get_redis_client from reflector.settings import settings from reflector.whereby import get_room_sessions +from reflector.worker.app import taskiq_broker from reflector.worker.session_decorator import with_session -logger = structlog.wrap_logger(get_task_logger(__name__)) +logger = structlog.get_logger(__name__) def parse_datetime_with_timezone(iso_string: str) -> datetime: @@ -34,8 +32,8 @@ def parse_datetime_with_timezone(iso_string: str) -> datetime: return dt -@shared_task -def process_messages(): +@taskiq_broker.task +async def process_messages(): queue_url = settings.AWS_PROCESS_RECORDING_QUEUE_URL if not queue_url: logger.warning("No process recording queue url") @@ -66,7 +64,7 @@ def process_messages(): if record["eventName"].startswith("ObjectCreated"): bucket = record["s3"]["bucket"]["name"] key = unquote(record["s3"]["object"]["key"]) - process_recording.delay(bucket, key) + await process_recording.kiq(bucket, key) sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle) logger.info("Processed and deleted message: %s", message) @@ -75,8 +73,7 @@ def process_messages(): logger.error("process_messages", error=str(e)) -@shared_task -@asynctask +@taskiq_broker.task @with_session async def process_recording(session: AsyncSession, bucket_name: str, object_key: str): logger.info("Processing recording: %s/%s", bucket_name, object_key) @@ -155,11 +152,10 @@ async def process_recording(session: AsyncSession, bucket_name: str, object_key: await transcripts_controller.update(session, transcript, {"status": "uploaded"}) - task_pipeline_file_process.delay(transcript_id=transcript.id) + await task_pipeline_file_process.kiq(transcript_id=transcript.id) -@shared_task -@asynctask +@taskiq_broker.task @with_session async def process_meetings(session: AsyncSession): """ @@ -253,8 +249,7 @@ async def process_meetings(session: AsyncSession): ) -@shared_task -@asynctask +@taskiq_broker.task @with_session async def reprocess_failed_recordings(session: AsyncSession): """ @@ -291,7 +286,7 @@ async def reprocess_failed_recordings(session: AsyncSession): ) if not recording: logger.info(f"Queueing recording for processing: {object_key}") - process_recording.delay(bucket_name, object_key) + await process_recording.kiq(bucket_name, object_key) reprocessed_count += 1 continue @@ -310,7 +305,7 @@ async def reprocess_failed_recordings(session: AsyncSession): if transcript is None or transcript.status == "error": logger.info(f"Queueing recording for processing: {object_key}") - process_recording.delay(bucket_name, object_key) + await process_recording.kiq(bucket_name, object_key) reprocessed_count += 1 except Exception as e: diff --git a/server/reflector/worker/session_decorator.py b/server/reflector/worker/session_decorator.py index 70d47b75..be1a2f22 100644 --- a/server/reflector/worker/session_decorator.py +++ b/server/reflector/worker/session_decorator.py @@ -8,9 +8,7 @@ that stays open for the entire duration of the task execution. import functools from typing import Any, Callable, TypeVar -from celery import current_task - -from reflector.db import get_session_factory +from reflector.db import get_session_context from reflector.db.transcripts import transcripts_controller from reflector.logger import logger @@ -21,12 +19,11 @@ def with_session(func: F) -> F: """ Decorator that provides an AsyncSession as the first argument to the decorated function. - This should be used AFTER the @asynctask decorator on Celery tasks to ensure - proper session management throughout the task execution. + This should be used with TaskIQ tasks to ensure proper session management + throughout the task execution. Example: - @shared_task - @asynctask + @taskiq_broker.task @with_session async def my_task(session: AsyncSession, arg1: str, arg2: int): # session is automatically provided and managed @@ -36,11 +33,9 @@ def with_session(func: F) -> F: @functools.wraps(func) async def wrapper(*args, **kwargs): - session_factory = get_session_factory() - async with session_factory() as session: - async with session.begin(): - # Pass session as first argument to the decorated function - return await func(session, *args, **kwargs) + async with get_session_context() as session: + # Pass session as first argument to the decorated function + return await func(session, *args, **kwargs) return wrapper @@ -56,11 +51,10 @@ def with_session_and_transcript(func: F) -> F: 4. Creates an enhanced logger with Celery task context 5. Passes session, transcript, and logger to the decorated function - This should be used AFTER the @asynctask decorator on Celery tasks. + This should be used with TaskIQ tasks. Example: - @shared_task - @asynctask + @taskiq_broker.task @with_session_and_transcript async def my_task(session: AsyncSession, transcript: Transcript, logger: Logger, arg1: str): # session, transcript, and logger are automatically provided @@ -76,34 +70,43 @@ def with_session_and_transcript(func: F) -> F: "transcript_id is required for @with_session_and_transcript" ) - session_factory = get_session_factory() - async with session_factory() as session: - async with session.begin(): - # Fetch the transcript - transcript = await transcripts_controller.get_by_id( - session, transcript_id + async with get_session_context() as session: + # Fetch the transcript + transcript = await transcripts_controller.get_by_id(session, transcript_id) + if not transcript: + raise Exception(f"Transcript {transcript_id} not found") + + # Create enhanced logger + tlogger = logger.bind(transcript_id=transcript.id) + + try: + # Pass session, transcript, and logger to the decorated function + return await func( + session, transcript=transcript, logger=tlogger, *args, **kwargs ) - if not transcript: - raise Exception(f"Transcript {transcript_id} not found") - - # Create enhanced logger with Celery task context - tlogger = logger.bind(transcript_id=transcript.id) - if current_task: - tlogger = tlogger.bind( - task_id=current_task.request.id, - task_name=current_task.name, - worker_hostname=current_task.request.hostname, - task_retries=current_task.request.retries, - transcript_id=transcript_id, - ) - - try: - # Pass session, transcript, and logger to the decorated function - return await func( - session, transcript=transcript, logger=tlogger, *args, **kwargs - ) - except Exception: - tlogger.exception("Error in task execution") - raise + except Exception: + tlogger.exception("Error in task execution") + raise + + return wrapper + + +def catch_exception(func: F) -> F: + """ + Decorator that catches exceptions and logs them using structlog. + """ + + @functools.wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception: + logger.exception( + "Exception caught in function execution", + func=func.__name__, + args=args, + kwargs=kwargs, + ) + raise return wrapper diff --git a/server/reflector/worker/taskiq_broker.py b/server/reflector/worker/taskiq_broker.py new file mode 100644 index 00000000..be1b1bf0 --- /dev/null +++ b/server/reflector/worker/taskiq_broker.py @@ -0,0 +1,76 @@ +""" +TaskIQ broker configuration for Reflector. + +This module provides a production-ready TaskIQ broker configuration that handles +both test and production environments correctly. It includes retry middleware +for 1:1 parity with Celery and proper logging setup. +""" + +import os + +import structlog +from taskiq import InMemoryBroker +from taskiq.middlewares import SimpleRetryMiddleware +from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker + +from reflector.settings import settings + +logger = structlog.get_logger(__name__) + + +def create_taskiq_broker(): + """ + Create and configure the TaskIQ broker based on environment. + + Returns: + Configured TaskIQ broker instance with appropriate backend and middleware. + """ + env = os.environ.get("ENVIRONMENT") + + if env == "pytest": + # Test environment: Use InMemoryBroker with immediate execution + logger.info("Configuring TaskIQ InMemoryBroker for test environment") + broker = InMemoryBroker(await_inplace=True) + + else: + # Production environment: Use Redis broker with result backend + logger.info( + "Configuring TaskIQ RedisStreamBroker for production environment", + redis_url=settings.CELERY_BROKER_URL, + ) + + # Configure Redis result backend + result_backend = RedisAsyncResultBackend( + redis_url=settings.CELERY_BROKER_URL, + result_ex_time=86400, # Results expire after 24 hours + ) + + # Configure Redis stream broker + broker = RedisStreamBroker( + url=settings.CELERY_BROKER_URL, + stream_name="taskiq:stream", # Custom stream name for clarity + consumer_group="taskiq:workers", # Consumer group for load balancing + ).with_result_backend(result_backend) + + # Add retry middleware for production parity with Celery + # This provides automatic retries on task failures + retry_middleware = SimpleRetryMiddleware( + default_retry_count=3, # Match Celery's default retry behavior + ) + broker.add_middlewares(retry_middleware) + + logger.info( + "TaskIQ broker configured successfully", + broker_type=type(broker).__name__, + has_result_backend=hasattr(broker, "_result_backend"), + middleware_count=len(broker.middlewares), + ) + + return broker + + +# Create the global broker instance +taskiq_broker = create_taskiq_broker() + +# Export the broker for use in task definitions +__all__ = ["taskiq_broker"] diff --git a/server/reflector/worker/webhook.py b/server/reflector/worker/webhook.py index 81c2ecb2..de503f9e 100644 --- a/server/reflector/worker/webhook.py +++ b/server/reflector/worker/webhook.py @@ -8,18 +8,16 @@ from datetime import datetime, timezone import httpx import structlog -from celery import shared_task -from celery.utils.log import get_task_logger from sqlalchemy.ext.asyncio import AsyncSession from reflector.db.rooms import rooms_controller from reflector.db.transcripts import transcripts_controller -from reflector.pipelines.main_live_pipeline import asynctask from reflector.settings import settings from reflector.utils.webvtt import topics_to_webvtt +from reflector.worker.app import taskiq_broker from reflector.worker.session_decorator import with_session -logger = structlog.wrap_logger(get_task_logger(__name__)) +logger = structlog.get_logger(__name__) def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str: @@ -33,30 +31,23 @@ def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> s return hmac_obj.hexdigest() -@shared_task( - bind=True, - max_retries=30, - default_retry_delay=60, - retry_backoff=True, - retry_backoff_max=3600, # Max 1 hour between retries -) -@asynctask +@taskiq_broker.task @with_session -async def send_transcript_webhook( - self, +async def send_transcript_webhook_taskiq( transcript_id: str, room_id: str, event_id: str, session: AsyncSession, ): + retry_count = 0 + log = logger.bind( transcript_id=transcript_id, room_id=room_id, - retry_count=self.request.retries, + retry_count=retry_count, ) try: - # Fetch transcript and room transcript = await transcripts_controller.get_by_id(session, transcript_id) if not transcript: log.error("Transcript not found, skipping webhook") @@ -71,11 +62,9 @@ async def send_transcript_webhook( log.info("No webhook URL configured for room, skipping") return - # Generate WebVTT content from topics topics_data = [] if transcript.topics: - # Build topics data with diarized content per topic for topic in transcript.topics: topic_webvtt = topics_to_webvtt([topic]) if topic.words else "" topics_data.append( @@ -88,7 +77,6 @@ async def send_transcript_webhook( } ) - # Build webhook payload frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}" participants = [ {"id": p.id, "name": p.name, "speaker": p.speaker} @@ -120,16 +108,14 @@ async def send_transcript_webhook( }, } - # Convert to JSON payload_json = json.dumps(payload_data, separators=(",", ":")) payload_bytes = payload_json.encode("utf-8") - # Generate signature if secret is configured headers = { "Content-Type": "application/json", "User-Agent": "Reflector-Webhook/1.0", "X-Webhook-Event": "transcript.completed", - "X-Webhook-Retry": str(self.request.retries), + "X-Webhook-Retry": str(retry_count), } if room.webhook_secret: @@ -139,7 +125,6 @@ async def send_transcript_webhook( ) headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}" - # Send webhook with timeout async with httpx.AsyncClient(timeout=30.0) as client: log.info( "Sending webhook", @@ -165,26 +150,22 @@ async def send_transcript_webhook( log.error( "Webhook failed with HTTP error", status_code=e.response.status_code, - response_text=e.response.text[:500], # First 500 chars + response_text=e.response.text[:500], ) - # Don't retry on client errors (4xx) if 400 <= e.response.status_code < 500: log.error("Client error, not retrying") return - # Retry on server errors (5xx) - raise self.retry(exc=e) + raise except (httpx.ConnectError, httpx.TimeoutException) as e: - # Retry on network errors log.error("Webhook failed with connection error", error=str(e)) - raise self.retry(exc=e) + raise except Exception as e: - # Retry on unexpected errors log.exception("Unexpected error in webhook task", error=str(e)) - raise self.retry(exc=e) + raise async def test_webhook(room_id: str) -> dict: diff --git a/server/taskiq_migration_plan.md b/server/taskiq_migration_plan.md new file mode 100644 index 00000000..3a5ec20c --- /dev/null +++ b/server/taskiq_migration_plan.md @@ -0,0 +1,86 @@ +# TaskIQ Migration Implementation Plan + +## Phase 1: Core Infrastructure Setup + +### 1.1 Create TaskIQ Broker Configuration +- [ ] Create `reflector/worker/taskiq_broker.py` with broker setup +- [ ] Configure Redis broker with proper connection pooling +- [ ] Add retry middleware for 1:1 parity with Celery +- [ ] Setup test/production environment detection + +### 1.2 Session Management Utilities +- [ ] Create `get_session_context()` function in `reflector/db.py` +- [ ] Ensure `@with_session` decorator works with TaskIQ +- [ ] Verify test mocking works with new session approach + +## Phase 2: Simple Task Migration (Start Small) + +### 2.1 Migrate Single Tasks First +- [ ] `reflector/worker/cleanup.py` - 1 task, simple logic +- [ ] `reflector/worker/webhook.py` - 1 task with retry logic +- [ ] Test each migrated task individually + +### 2.2 Create Dual-Mode Tasks +- [ ] Keep Celery version with `@shared_task` +- [ ] Add TaskIQ version without `@asynctask` +- [ ] Use feature flag to switch between versions + +## Phase 3: Complex Pipeline Migration + +### 3.1 File Processing Pipeline +- [ ] Migrate `task_pipeline_file_process` completely +- [ ] Handle all sub-tasks in the pipeline +- [ ] Migrate chain/group/chord patterns to TaskIQ + +### 3.2 Live Processing Pipeline +- [ ] Migrate all 10 tasks in `main_live_pipeline.py` +- [ ] Convert complex chord patterns +- [ ] Ensure WebSocket notifications still work + +## Phase 4: Scheduled Tasks Migration + +### 4.1 Convert Celery Beat to TaskIQ Scheduler +- [ ] Create `reflector/worker/scheduler.py` +- [ ] Migrate all scheduled tasks +- [ ] Setup TaskIQ scheduler service + +## Phase 5: Testing Infrastructure + +### 5.1 Update Test Fixtures +- [ ] Create TaskIQ test fixtures in `conftest.py` +- [ ] Ensure dual-mode testing (both Celery and TaskIQ) +- [ ] Verify all existing tests pass + +### 5.2 Migration-Specific Tests +- [ ] Test session management across tasks +- [ ] Test retry logic parity +- [ ] Test scheduled task execution + +## Phase 6: Deployment & Monitoring + +### 6.1 Update Deployment Scripts +- [ ] Update Docker configurations +- [ ] Create TaskIQ worker startup scripts +- [ ] Setup health checks for TaskIQ + +### 6.2 Monitoring Setup +- [ ] Create TaskIQ metrics collection +- [ ] Setup alerting for failed tasks +- [ ] Create migration rollback plan + +## Execution Order + +1. **Week 1**: Phase 1 + Phase 2.1 +2. **Week 2**: Phase 2.2 + Phase 3.1 +3. **Week 3**: Phase 3.2 + Phase 4 +4. **Week 4**: Phase 5 +5. **Week 5**: Phase 6 + Testing +6. **Week 6**: Cutover + Monitoring + +## Success Metrics + +- All tests passing with TaskIQ +- No performance degradation +- Successful parallel running for 1 week +- Zero data loss during migration +- Rollback tested and documented \ No newline at end of file diff --git a/server/tests/conftest.py b/server/tests/conftest.py index c7c28415..93ebf2e1 100644 --- a/server/tests/conftest.py +++ b/server/tests/conftest.py @@ -1,7 +1,6 @@ import asyncio import os import sys -from tempfile import NamedTemporaryFile from unittest.mock import patch import pytest @@ -322,26 +321,60 @@ async def dummy_storage(): yield -@pytest.fixture(scope="session") -def celery_enable_logging(): - return True +# from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +# from sqlalchemy.orm import sessionmaker -@pytest.fixture(scope="session") -def celery_config(): - with NamedTemporaryFile() as f: - yield { - "broker_url": "memory://", - "result_backend": f"db+sqlite:///{f.name}", - } +# @pytest.fixture() +# async def db_connection(sqla_engine): +# connection = await sqla_engine.connect() +# try: +# yield connection +# finally: +# await connection.close() -@pytest.fixture(scope="session") -def celery_includes(): - return [ - "reflector.pipelines.main_live_pipeline", - "reflector.pipelines.main_file_pipeline", - ] +# @pytest.fixture() +# async def db_session_maker(db_connection): +# Session = async_sessionmaker( +# db_connection, +# expire_on_commit=False, +# class_=AsyncSession, +# ) +# yield Session + + +# @pytest.fixture() +# async def db_session(db_session_maker, db_connection): +# """ +# Fixture that returns a SQLAlchemy session with a SAVEPOINT, and the rollback to it +# after the test completes. +# """ +# session = db_session_maker( +# bind=db_connection, +# join_transaction_mode="create_savepoint", +# ) + +# try: +# yield session +# finally: +# await session.close() + + +# @pytest.fixture(autouse=True) +# async def ensure_db_session_in_app(db_connection, db_session_maker): +# async def mock_get_session(): +# session = db_session_maker( +# bind=db_connection, join_transaction_mode="create_savepoint" +# ) + +# try: +# yield session +# finally: +# await session.close() + +# with patch("reflector.db._get_session", side_effect=mock_get_session): +# yield @pytest.fixture(autouse=True) @@ -372,6 +405,18 @@ def fake_mp3_upload(): yield +@pytest.fixture +async def taskiq_broker(): + from reflector.worker.app import taskiq_broker + + await taskiq_broker.startup() + + try: + yield taskiq_broker + finally: + await taskiq_broker.shutdown() + + @pytest.fixture async def fake_transcript_with_topics(tmpdir, client, db_session): import shutil diff --git a/server/tests/test_ics_background_tasks.py b/server/tests/test_ics_background_tasks.py index dbbd152a..4639b116 100644 --- a/server/tests/test_ics_background_tasks.py +++ b/server/tests/test_ics_background_tasks.py @@ -130,15 +130,15 @@ async def test_sync_all_ics_calendars(db_session): ics_enabled=False, ) - with patch("reflector.worker.ics_sync.sync_room_ics.delay") as mock_delay: + with patch("reflector.worker.ics_sync.sync_room_ics.kiq") as mock_kiq: ics_enabled_rooms = await rooms_controller.get_ics_enabled(db_session) for room in ics_enabled_rooms: if room and _should_sync(room): - sync_room_ics.delay(room.id) + await sync_room_ics.kiq(room.id) - assert mock_delay.call_count == 2 - called_room_ids = [call.args[0] for call in mock_delay.call_args_list] + assert mock_kiq.call_count == 2 + called_room_ids = [call.args[0] for call in mock_kiq.call_args_list] assert room1.id in called_room_ids assert room2.id in called_room_ids assert room3.id not in called_room_ids @@ -210,15 +210,15 @@ async def test_sync_respects_fetch_interval(db_session): {"ics_last_sync": now - timedelta(seconds=100)}, ) - with patch("reflector.worker.ics_sync.sync_room_ics.delay") as mock_delay: + with patch("reflector.worker.ics_sync.sync_room_ics.kiq") as mock_kiq: ics_enabled_rooms = await rooms_controller.get_ics_enabled(db_session) for room in ics_enabled_rooms: if room and _should_sync(room): - sync_room_ics.delay(room.id) + await sync_room_ics.kiq(room.id) - assert mock_delay.call_count == 1 - assert mock_delay.call_args[0][0] == room2.id + assert mock_kiq.call_count == 1 + assert mock_kiq.call_args[0][0] == room2.id @pytest.mark.asyncio diff --git a/server/tests/test_transcripts_process.py b/server/tests/test_transcripts_process.py index facc21b7..a7504172 100644 --- a/server/tests/test_transcripts_process.py +++ b/server/tests/test_transcripts_process.py @@ -1,9 +1,11 @@ -import asyncio -import time +import os import pytest from httpx import ASGITransport, AsyncClient +# Set environment for TaskIQ to use InMemoryBroker +os.environ["ENVIRONMENT"] = "pytest" + @pytest.fixture async def app_lifespan(): @@ -23,8 +25,16 @@ async def client(app_lifespan): ) -@pytest.mark.usefixtures("celery_session_app") -@pytest.mark.usefixtures("celery_session_worker") +@pytest.fixture +async def taskiq_broker(): + from reflector.worker.app import taskiq_broker + + # Broker is already initialized as InMemoryBroker due to ENVIRONMENT=pytest + await taskiq_broker.startup() + yield taskiq_broker + await taskiq_broker.shutdown() + + @pytest.mark.asyncio async def test_transcript_process( tmpdir, @@ -34,7 +44,10 @@ async def test_transcript_process( dummy_file_diarization, dummy_storage, client, + taskiq_broker, + db_session, ): + print("IN TEST", db_session) # create a transcript response = await client.post("/transcripts", json={"name": "test"}) assert response.status_code == 200 @@ -55,18 +68,14 @@ async def test_transcript_process( assert response.status_code == 200 assert response.json()["status"] == "ok" - # wait for processing to finish (max 1 minute) - timeout_seconds = 60 - start_time = time.monotonic() - while (time.monotonic() - start_time) < timeout_seconds: - # fetch the transcript and check if it is ended - resp = await client.get(f"/transcripts/{tid}") - assert resp.status_code == 200 - if resp.json()["status"] in ("ended", "error"): - break - await asyncio.sleep(1) - else: - pytest.fail(f"Initial processing timed out after {timeout_seconds} seconds") + # Wait for all tasks to complete since we're using InMemoryBroker + await taskiq_broker.wait_all() + + # Ensure it's finished ok + resp = await client.get(f"/transcripts/{tid}") + assert resp.status_code == 200 + print(resp.json()) + assert resp.json()["status"] in ("ended", "error") # restart the processing response = await client.post( @@ -74,20 +83,15 @@ async def test_transcript_process( ) assert response.status_code == 200 assert response.json()["status"] == "ok" - await asyncio.sleep(2) - # wait for processing to finish (max 1 minute) - timeout_seconds = 60 - start_time = time.monotonic() - while (time.monotonic() - start_time) < timeout_seconds: - # fetch the transcript and check if it is ended - resp = await client.get(f"/transcripts/{tid}") - assert resp.status_code == 200 - if resp.json()["status"] in ("ended", "error"): - break - await asyncio.sleep(1) - else: - pytest.fail(f"Restart processing timed out after {timeout_seconds} seconds") + # Wait for all tasks to complete since we're using InMemoryBroker + await taskiq_broker.wait_all() + + # Ensure it's finished ok + resp = await client.get(f"/transcripts/{tid}") + assert resp.status_code == 200 + print(resp.json()) + assert resp.json()["status"] in ("ended", "error") # check the transcript is ended transcript = resp.json() diff --git a/server/tests/test_transcripts_rtc_ws.py b/server/tests/test_transcripts_rtc_ws.py index f1cf01b4..817cadb1 100644 --- a/server/tests/test_transcripts_rtc_ws.py +++ b/server/tests/test_transcripts_rtc_ws.py @@ -49,7 +49,7 @@ class ThreadedUvicorn: @pytest.fixture -def appserver(tmpdir, database, celery_session_app, celery_session_worker): +def appserver(tmpdir, database): import threading from reflector.app import app @@ -111,8 +111,6 @@ def appserver(tmpdir, database, celery_session_app, celery_session_worker): settings.DATA_DIR = DATA_DIR -@pytest.mark.usefixtures("celery_session_app") -@pytest.mark.usefixtures("celery_session_worker") @pytest.mark.asyncio async def test_transcript_rtc_and_websocket( tmpdir, @@ -275,8 +273,6 @@ async def test_transcript_rtc_and_websocket( assert audio_resp.headers["Content-Type"] == "audio/mpeg" -@pytest.mark.usefixtures("celery_session_app") -@pytest.mark.usefixtures("celery_session_worker") @pytest.mark.asyncio async def test_transcript_rtc_and_websocket_and_fr( tmpdir, diff --git a/server/tests/test_transcripts_upload.py b/server/tests/test_transcripts_upload.py index e6a9a6a6..ce2c2aef 100644 --- a/server/tests/test_transcripts_upload.py +++ b/server/tests/test_transcripts_upload.py @@ -4,8 +4,6 @@ import time import pytest -@pytest.mark.usefixtures("celery_session_app") -@pytest.mark.usefixtures("celery_session_worker") @pytest.mark.asyncio async def test_transcript_upload_file( tmpdir, diff --git a/server/uv.lock b/server/uv.lock index 3f39924f..da3e1414 100644 --- a/server/uv.lock +++ b/server/uv.lock @@ -210,18 +210,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c2/62/96b5217b742805236614f05904541000f55422a6060a90d7fd4ce26c172d/alembic-1.16.4-py3-none-any.whl", hash = "sha256:b05e51e8e82efc1abd14ba2af6392897e145930c3e0a2faf2b0da2f7f7fd660d", size = 247026 }, ] -[[package]] -name = "amqp" -version = "5.3.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "vine" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/79/fc/ec94a357dfc6683d8c86f8b4cfa5416a4c36b28052ec8260c77aca96a443/amqp-5.3.1.tar.gz", hash = "sha256:cddc00c725449522023bad949f70fff7b48f0b1ade74d170a6f10ab044739432", size = 129013 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/26/99/fc813cd978842c26c82534010ea849eee9ab3a13ea2b74e95cb9c99e747b/amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2", size = 50944 }, -] - [[package]] name = "annotated-types" version = "0.7.0" @@ -375,15 +363,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/50/cd/30110dc0ffcf3b131156077b90e9f60ed75711223f306da4db08eff8403b/beautifulsoup4-4.13.4-py3-none-any.whl", hash = "sha256:9bbbb14bfde9d79f38b8cd5f8c7c85f4b8f2523190ebed90e950a8dea4cb1c4b", size = 187285 }, ] -[[package]] -name = "billiard" -version = "4.2.1" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/7c/58/1546c970afcd2a2428b1bfafecf2371d8951cc34b46701bea73f4280989e/billiard-4.2.1.tar.gz", hash = "sha256:12b641b0c539073fc8d3f5b8b7be998956665c4233c7c1fcd66a7e677c4fb36f", size = 155031 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/30/da/43b15f28fe5f9e027b41c539abc5469052e9d48fd75f8ff094ba2a0ae767/billiard-4.2.1-py3-none-any.whl", hash = "sha256:40b59a4ac8806ba2c2369ea98d876bc6108b051c227baffd928c644d15d8f3cb", size = 86766 }, -] - [[package]] name = "black" version = "24.1.1" @@ -436,25 +415,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7e/83/a753562020b69fa90cebc39e8af2c753b24dcdc74bee8355ee3f6cefdf34/botocore-1.38.27-py3-none-any.whl", hash = "sha256:a785d5e9a5eda88ad6ab9ed8b87d1f2ac409d0226bba6ff801c55359e94d91a8", size = 13580545 }, ] -[[package]] -name = "celery" -version = "5.5.3" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "billiard" }, - { name = "click" }, - { name = "click-didyoumean" }, - { name = "click-plugins" }, - { name = "click-repl" }, - { name = "kombu" }, - { name = "python-dateutil" }, - { name = "vine" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/bb/7d/6c289f407d219ba36d8b384b42489ebdd0c84ce9c413875a8aae0c85f35b/celery-5.5.3.tar.gz", hash = "sha256:6c972ae7968c2b5281227f01c3a3f984037d21c5129d07bf3550cc2afc6b10a5", size = 1667144 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/c9/af/0dcccc7fdcdf170f9a1585e5e96b6fb0ba1749ef6be8c89a6202284759bd/celery-5.5.3-py3-none-any.whl", hash = "sha256:0b5761a07057acee94694464ca482416b959568904c9dfa41ce8413a7d65d525", size = 438775 }, -] - [[package]] name = "certifi" version = "2025.7.14" @@ -545,43 +505,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/85/32/10bb5764d90a8eee674e9dc6f4db6a0ab47c8c4d0d83c27f7c39ac415a4d/click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b", size = 102215 }, ] -[[package]] -name = "click-didyoumean" -version = "0.3.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "click" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/30/ce/217289b77c590ea1e7c24242d9ddd6e249e52c795ff10fac2c50062c48cb/click_didyoumean-0.3.1.tar.gz", hash = "sha256:4f82fdff0dbe64ef8ab2279bd6aa3f6a99c3b28c05aa09cbfc07c9d7fbb5a463", size = 3089 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/1b/5b/974430b5ffdb7a4f1941d13d83c64a0395114503cc357c6b9ae4ce5047ed/click_didyoumean-0.3.1-py3-none-any.whl", hash = "sha256:5c4bb6007cfea5f2fd6583a2fb6701a22a41eb98957e63d0fac41c10e7c3117c", size = 3631 }, -] - -[[package]] -name = "click-plugins" -version = "1.1.1.2" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "click" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/c3/a4/34847b59150da33690a36da3681d6bbc2ec14ee9a846bc30a6746e5984e4/click_plugins-1.1.1.2.tar.gz", hash = "sha256:d7af3984a99d243c131aa1a828331e7630f4a88a9741fd05c927b204bcf92261", size = 8343 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/3d/9a/2abecb28ae875e39c8cad711eb1186d8d14eab564705325e77e4e6ab9ae5/click_plugins-1.1.1.2-py2.py3-none-any.whl", hash = "sha256:008d65743833ffc1f5417bf0e78e8d2c23aab04d9745ba817bd3e71b0feb6aa6", size = 11051 }, -] - -[[package]] -name = "click-repl" -version = "0.3.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "click" }, - { name = "prompt-toolkit" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/cb/a2/57f4ac79838cfae6912f997b4d1a64a858fb0c86d7fcaae6f7b58d267fca/click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9", size = 10449 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/52/40/9d857001228658f0d59e97ebd4c346fe73e138c6de1bce61dc568a57c7f8/click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812", size = 10289 }, -] - [[package]] name = "colorama" version = "0.4.6" @@ -775,23 +698,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c3/be/d0d44e092656fe7a06b55e6103cbce807cdbdee17884a5367c68c9860853/dataclasses_json-0.6.7-py3-none-any.whl", hash = "sha256:0dbf33f26c8d5305befd61b39d2b3414e8a407bedc2834dea9b8d642666fb40a", size = 28686 }, ] -[[package]] -name = "debugpy" -version = "1.8.15" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/8c/8b/3a9a28ddb750a76eaec445c7f4d3147ea2c579a97dbd9e25d39001b92b21/debugpy-1.8.15.tar.gz", hash = "sha256:58d7a20b7773ab5ee6bdfb2e6cf622fdf1e40c9d5aef2857d85391526719ac00", size = 1643279 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d2/b3/1c44a2ed311199ab11c2299c9474a6c7cd80d19278defd333aeb7c287995/debugpy-1.8.15-cp311-cp311-macosx_14_0_universal2.whl", hash = "sha256:babc4fb1962dd6a37e94d611280e3d0d11a1f5e6c72ac9b3d87a08212c4b6dd3", size = 2183442 }, - { url = "https://files.pythonhosted.org/packages/f6/69/e2dcb721491e1c294d348681227c9b44fb95218f379aa88e12a19d85528d/debugpy-1.8.15-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f778e68f2986a58479d0ac4f643e0b8c82fdd97c2e200d4d61e7c2d13838eb53", size = 3134215 }, - { url = "https://files.pythonhosted.org/packages/17/76/4ce63b95d8294dcf2fd1820860b300a420d077df4e93afcaa25a984c2ca7/debugpy-1.8.15-cp311-cp311-win32.whl", hash = "sha256:f9d1b5abd75cd965e2deabb1a06b0e93a1546f31f9f621d2705e78104377c702", size = 5154037 }, - { url = "https://files.pythonhosted.org/packages/c2/a7/e5a7c784465eb9c976d84408873d597dc7ce74a0fc69ed009548a1a94813/debugpy-1.8.15-cp311-cp311-win_amd64.whl", hash = "sha256:62954fb904bec463e2b5a415777f6d1926c97febb08ef1694da0e5d1463c5c3b", size = 5178133 }, - { url = "https://files.pythonhosted.org/packages/ab/4a/4508d256e52897f5cdfee6a6d7580974811e911c6d01321df3264508a5ac/debugpy-1.8.15-cp312-cp312-macosx_14_0_universal2.whl", hash = "sha256:3dcc7225cb317469721ab5136cda9ff9c8b6e6fb43e87c9e15d5b108b99d01ba", size = 2511197 }, - { url = "https://files.pythonhosted.org/packages/99/8d/7f6ef1097e7fecf26b4ef72338d08e41644a41b7ee958a19f494ffcffc29/debugpy-1.8.15-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:047a493ca93c85ccede1dbbaf4e66816794bdc214213dde41a9a61e42d27f8fc", size = 4229517 }, - { url = "https://files.pythonhosted.org/packages/3f/e8/e8c6a9aa33a9c9c6dacbf31747384f6ed2adde4de2e9693c766bdf323aa3/debugpy-1.8.15-cp312-cp312-win32.whl", hash = "sha256:b08e9b0bc260cf324c890626961dad4ffd973f7568fbf57feb3c3a65ab6b6327", size = 5276132 }, - { url = "https://files.pythonhosted.org/packages/e9/ad/231050c6177b3476b85fcea01e565dac83607b5233d003ff067e2ee44d8f/debugpy-1.8.15-cp312-cp312-win_amd64.whl", hash = "sha256:e2a4fe357c92334272eb2845fcfcdbec3ef9f22c16cf613c388ac0887aed15fa", size = 5317645 }, - { url = "https://files.pythonhosted.org/packages/07/d5/98748d9860e767a1248b5e31ffa7ce8cb7006e97bf8abbf3d891d0a8ba4e/debugpy-1.8.15-py2.py3-none-any.whl", hash = "sha256:bce2e6c5ff4f2e00b98d45e7e01a49c7b489ff6df5f12d881c67d2f1ac635f3d", size = 5282697 }, -] - [[package]] name = "defusedxml" version = "0.7.1" @@ -840,20 +746,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/68/1b/e0a87d256e40e8c888847551b20a017a6b98139178505dc7ffb96f04e954/dnspython-2.7.0-py3-none-any.whl", hash = "sha256:b4c34b7d10b51bcc3a5071e7b8dee77939f1e878477eeecc965e9835f63c6c86", size = 313632 }, ] -[[package]] -name = "docker" -version = "7.1.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "pywin32", marker = "sys_platform == 'win32'" }, - { name = "requests" }, - { name = "urllib3" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/91/9b/4a2ea29aeba62471211598dac5d96825bb49348fa07e906ea930394a83ce/docker-7.1.0.tar.gz", hash = "sha256:ad8c70e6e3f8926cb8a92619b832b4ea5299e2831c14284663184e200546fa6c", size = 117834 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/e3/26/57c6fb270950d476074c087527a558ccb6f4436657314bfb6cdf484114c4/docker-7.1.0-py3-none-any.whl", hash = "sha256:c96b93b7f0a746f9e77d325bcfb87422a3d8bd4f03136ae8a85b37f1898d5fc0", size = 147774 }, -] - [[package]] name = "docopt" version = "0.6.2" @@ -1319,6 +1211,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9c/1f/19ebc343cc71a7ffa78f17018535adc5cbdd87afb31d7c34874680148b32/ifaddr-0.2.0-py3-none-any.whl", hash = "sha256:085e0305cfe6f16ab12d72e2024030f5d52674afad6911bb1eee207177b8a748", size = 12314 }, ] +[[package]] +name = "importlib-metadata" +version = "8.7.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "zipp" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/76/66/650a33bd90f786193e4de4b3ad86ea60b53c89b669a5c7be931fac31cdb0/importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000", size = 56641 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/20/b0/36bd937216ec521246249be3bf9855081de4c5e06a0c9b4219dbeda50373/importlib_metadata-8.7.0-py3-none-any.whl", hash = "sha256:e5dd1551894c77868a30651cef00984d50e1002d06942a7101d34870c5f02afd", size = 27656 }, +] + [[package]] name = "iniconfig" version = "2.1.0" @@ -1328,6 +1232,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050 }, ] +[[package]] +name = "izulu" +version = "0.50.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d0/58/6d6335c78b7ade54d8a6c6dbaa589e5c21b3fd916341d5a16f774c72652a/izulu-0.50.0.tar.gz", hash = "sha256:cc8e252d5e8560c70b95380295008eeb0786f7b745a405a40d3556ab3252d5f5", size = 48558 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4a/9f/bf9d33546bbb6e5e80ebafe46f90b7d8b4a77410b7b05160b0ca8978c15a/izulu-0.50.0-py3-none-any.whl", hash = "sha256:4e9ae2508844e7c5f62c468a8b9e2deba2f60325ef63f01e65b39fd9a6b3fab4", size = 18095 }, +] + [[package]] name = "jinja2" version = "3.1.6" @@ -1479,21 +1392,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/da/e9/0d4add7873a73e462aeb45c036a2dead2562b825aa46ba326727b3f31016/kiwisolver-1.4.9-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:fb940820c63a9590d31d88b815e7a3aa5915cad3ce735ab45f0c730b39547de1", size = 73929 }, ] -[[package]] -name = "kombu" -version = "5.5.4" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "amqp" }, - { name = "packaging" }, - { name = "tzdata" }, - { name = "vine" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/0f/d3/5ff936d8319ac86b9c409f1501b07c426e6ad41966fedace9ef1b966e23f/kombu-5.5.4.tar.gz", hash = "sha256:886600168275ebeada93b888e831352fe578168342f0d1d5833d88ba0d847363", size = 461992 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ef/70/a07dcf4f62598c8ad579df241af55ced65bed76e42e45d3c368a6d82dbc1/kombu-5.5.4-py3-none-any.whl", hash = "sha256:a12ed0557c238897d8e518f1d1fdf84bd1516c5e305af2dacd85c2015115feb8", size = 210034 }, -] - [[package]] name = "levenshtein" version = "0.27.1" @@ -2305,18 +2203,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/27/72/0824c18f3bc75810f55dacc2dd933f6ec829771180245ae3cc976195dec0/prometheus_fastapi_instrumentator-7.1.0-py3-none-any.whl", hash = "sha256:978130f3c0bb7b8ebcc90d35516a6fe13e02d2eb358c8f83887cdef7020c31e9", size = 19296 }, ] -[[package]] -name = "prompt-toolkit" -version = "3.0.51" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "wcwidth" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/bb/6e/9d084c929dfe9e3bfe0c6a47e31f78a25c54627d64a66e884a8bf5474f1c/prompt_toolkit-3.0.51.tar.gz", hash = "sha256:931a162e3b27fc90c86f1b48bb1fb2c528c2761475e57c9c06de13311c7b54ed", size = 428940 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ce/4f/5249960887b1fbe561d9ff265496d170b55a735b76724f10ef19f9e40716/prompt_toolkit-3.0.51-py3-none-any.whl", hash = "sha256:52742911fde84e2d423e2f9a4cf1de7d7ac4e51958f648d9540e0fb8db077b07", size = 387810 }, -] - [[package]] name = "propcache" version = "0.3.2" @@ -2372,21 +2258,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f7/af/ab3c51ab7507a7325e98ffe691d9495ee3d3aa5f589afad65ec920d39821/protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e", size = 168724 }, ] -[[package]] -name = "psutil" -version = "7.0.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051 }, - { url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535 }, - { url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004 }, - { url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986 }, - { url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544 }, - { url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053 }, - { url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885 }, -] - [[package]] name = "psycopg2-binary" version = "2.9.10" @@ -2539,6 +2410,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/13/a3/a812df4e2dd5696d1f351d58b8fe16a405b234ad2886a0dab9183fb78109/pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc", size = 117552 }, ] +[[package]] +name = "pycron" +version = "3.2.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/8e/5d/340be12ae4a69c33102dfb6ddc1dc6e53e69b2d504fa26b5d34a472c3057/pycron-3.2.0.tar.gz", hash = "sha256:e125a28aca0295769541a40633f70b602579df48c9cb357c36c28d2628ba2b13", size = 4248 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f0/76/caf316909f4545e7158e0e1defd8956a1da49f4af04f5d16b18c358dfeac/pycron-3.2.0-py3-none-any.whl", hash = "sha256:6d2349746270bd642b71b9f7187cf13f4d9ee2412b4710396a507b5fe4f60dac", size = 4904 }, +] + [[package]] name = "pydantic" version = "2.11.7" @@ -2782,25 +2662,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c7/9d/bf86eddabf8c6c9cb1ea9a869d6873b46f105a5d292d3a6f7071f5b07935/pytest_asyncio-1.1.0-py3-none-any.whl", hash = "sha256:5fe2d69607b0bd75c656d1211f969cadba035030156745ee09e7d71740e58ecf", size = 15157 }, ] -[[package]] -name = "pytest-celery" -version = "1.2.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "celery" }, - { name = "debugpy" }, - { name = "docker" }, - { name = "kombu" }, - { name = "psutil" }, - { name = "pytest-docker-tools" }, - { name = "setuptools" }, - { name = "tenacity" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/f4/a0/fcc6a3fbd8c06d04d206da27e1062eaee8ddc0cbaf2db72e66aa74f240e8/pytest_celery-1.2.0.tar.gz", hash = "sha256:de605eca1b0134c136910c8ed161cc3996b0c8aaafd29170878a396eed81b5b1", size = 28043 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/0a/f2/b16389f7ceccd96fd887762626c1509cdd39fdc8f7d4cec59cb1e9716df5/pytest_celery-1.2.0-py3-none-any.whl", hash = "sha256:d81d22a3bed21eb180fa2ee2dd701a00cc4f7c3f1d578e99620c887cad331fb6", size = 49040 }, -] - [[package]] name = "pytest-cov" version = "6.2.1" @@ -2828,19 +2689,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c5/c7/e057e0d1de611ce1bbb26cccf07ddf56eb30a6f6a03aa512a09dac356e03/pytest_docker-3.2.3-py3-none-any.whl", hash = "sha256:f973c35e6f2b674c8fc87e8b3354b02c15866a21994c0841a338c240a05de1eb", size = 8585 }, ] -[[package]] -name = "pytest-docker-tools" -version = "3.1.9" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "docker" }, - { name = "pytest" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/a6/2a/4d68472c2bac257c4e7389b5ca3a6b6cd7da88bce012bed321fdd31372ae/pytest_docker_tools-3.1.9.tar.gz", hash = "sha256:1b6a0cb633c20145731313335ef15bcf5571839c06726764e60cbe495324782b", size = 42824 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/57/79/9dae84c244dabebca6a952e098d6ac9d13719b701fc5323ba6d00abc675a/pytest_docker_tools-3.1.9-py2.py3-none-any.whl", hash = "sha256:36f8e88d56d84ea177df68a175673681243dd991d2807fbf551d90f60341bfdb", size = 29268 }, -] - [[package]] name = "pytest-env" version = "1.1.5" @@ -2974,19 +2822,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225 }, ] -[[package]] -name = "pywin32" -version = "311" -source = { registry = "https://pypi.org/simple" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/7c/af/449a6a91e5d6db51420875c54f6aff7c97a86a3b13a0b4f1a5c13b988de3/pywin32-311-cp311-cp311-win32.whl", hash = "sha256:184eb5e436dea364dcd3d2316d577d625c0351bf237c4e9a5fabbcfa5a58b151", size = 8697031 }, - { url = "https://files.pythonhosted.org/packages/51/8f/9bb81dd5bb77d22243d33c8397f09377056d5c687aa6d4042bea7fbf8364/pywin32-311-cp311-cp311-win_amd64.whl", hash = "sha256:3ce80b34b22b17ccbd937a6e78e7225d80c52f5ab9940fe0506a1a16f3dab503", size = 9508308 }, - { url = "https://files.pythonhosted.org/packages/44/7b/9c2ab54f74a138c491aba1b1cd0795ba61f144c711daea84a88b63dc0f6c/pywin32-311-cp311-cp311-win_arm64.whl", hash = "sha256:a733f1388e1a842abb67ffa8e7aad0e70ac519e09b0f6a784e65a136ec7cefd2", size = 8703930 }, - { url = "https://files.pythonhosted.org/packages/e7/ab/01ea1943d4eba0f850c3c61e78e8dd59757ff815ff3ccd0a84de5f541f42/pywin32-311-cp312-cp312-win32.whl", hash = "sha256:750ec6e621af2b948540032557b10a2d43b0cee2ae9758c54154d711cc852d31", size = 8706543 }, - { url = "https://files.pythonhosted.org/packages/d1/a8/a0e8d07d4d051ec7502cd58b291ec98dcc0c3fff027caad0470b72cfcc2f/pywin32-311-cp312-cp312-win_amd64.whl", hash = "sha256:b8c095edad5c211ff31c05223658e71bf7116daa0ecf3ad85f3201ea3190d067", size = 9495040 }, - { url = "https://files.pythonhosted.org/packages/ba/3a/2ae996277b4b50f17d61f0603efd8253cb2d79cc7ae159468007b586396d/pywin32-311-cp312-cp312-win_arm64.whl", hash = "sha256:e286f46a9a39c4a18b319c28f59b61de793654af2f395c102b4f819e584b5852", size = 8710102 }, -] - [[package]] name = "pyyaml" version = "6.0.2" @@ -3094,7 +2929,6 @@ dependencies = [ { name = "alembic" }, { name = "asyncpg" }, { name = "av" }, - { name = "celery" }, { name = "fastapi", extra = ["standard"] }, { name = "fastapi-pagination" }, { name = "httpx" }, @@ -3118,6 +2952,8 @@ dependencies = [ { name = "sortedcontainers" }, { name = "sqlalchemy" }, { name = "structlog" }, + { name = "taskiq" }, + { name = "taskiq-redis" }, { name = "transformers" }, { name = "uvicorn", extra = ["standard"] }, { name = "webvtt-py" }, @@ -3156,7 +2992,6 @@ tests = [ { name = "pytest" }, { name = "pytest-aiohttp" }, { name = "pytest-asyncio" }, - { name = "pytest-celery" }, { name = "pytest-cov" }, { name = "pytest-docker" }, { name = "pytest-httpx" }, @@ -3171,7 +3006,6 @@ requires-dist = [ { name = "alembic", specifier = ">=1.11.3" }, { name = "asyncpg", specifier = ">=0.29.0" }, { name = "av", specifier = ">=10.0.0" }, - { name = "celery", specifier = ">=5.3.4" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" }, { name = "fastapi-pagination", specifier = ">=0.12.6" }, { name = "httpx", specifier = ">=0.24.1" }, @@ -3195,6 +3029,8 @@ requires-dist = [ { name = "sortedcontainers", specifier = ">=2.4.0" }, { name = "sqlalchemy", specifier = ">=2.0.0" }, { name = "structlog", specifier = ">=23.1.0" }, + { name = "taskiq", specifier = ">=0.11.18" }, + { name = "taskiq-redis", specifier = ">=1.1.0" }, { name = "transformers", specifier = ">=4.36.2" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.23.1" }, { name = "webvtt-py", specifier = ">=0.5.0" }, @@ -3229,7 +3065,6 @@ tests = [ { name = "pytest", specifier = ">=7.4.0" }, { name = "pytest-aiohttp", specifier = ">=1.0.4" }, { name = "pytest-asyncio", specifier = ">=0.21.1" }, - { name = "pytest-celery", specifier = ">=0.0.0" }, { name = "pytest-cov", specifier = ">=4.1.0" }, { name = "pytest-docker", specifier = ">=3.2.3" }, { name = "pytest-httpx", specifier = ">=0.23.1" }, @@ -3807,6 +3642,48 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/40/44/4a5f08c96eb108af5cb50b41f76142f0afa346dfa99d5296fe7202a11854/tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f", size = 35252 }, ] +[[package]] +name = "taskiq" +version = "0.11.18" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "importlib-metadata" }, + { name = "izulu" }, + { name = "packaging" }, + { name = "pycron" }, + { name = "pydantic" }, + { name = "pytz" }, + { name = "taskiq-dependencies" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/91/4d/0d1b3b6c77a45d7a8c685a9c916b2532cca36a26771831949b874f6d15c3/taskiq-0.11.18.tar.gz", hash = "sha256:b83e1b70aee74d0a197d4a4a5ba165b8ba85b12a2b3b7ebfa3c6fdcc9e3128a7", size = 54323 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4a/d5/46505f57c140d10d4c36f6bd2f2047fb0460e4d5b9b841dc3b93ab8c893d/taskiq-0.11.18-py3-none-any.whl", hash = "sha256:0df58be24e4ef5d19c8ef02581d35d392b0d780d3fe37950e0478022b85ce288", size = 79608 }, +] + +[[package]] +name = "taskiq-dependencies" +version = "1.5.7" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/47/90/47a627696e53bfdcacabc3e8c05b73bf1424685bcb5f17209cb8b12da1bf/taskiq_dependencies-1.5.7.tar.gz", hash = "sha256:0d3b240872ef152b719153b9526d866d2be978aeeaea6600e878414babc2dcb4", size = 14875 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/99/6d/4a012f2de002c2e93273f5e7d3e3feea02f7fdbb7b75ca2ca1dd10703091/taskiq_dependencies-1.5.7-py3-none-any.whl", hash = "sha256:6fcee5d159bdb035ef915d4d848826169b6f06fe57cc2297a39b62ea3e76036f", size = 13801 }, +] + +[[package]] +name = "taskiq-redis" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "redis" }, + { name = "taskiq" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/59/73/ce7dd796950f792c94ea7e59609ff0ab5205fea72bbb60054be6f8757b1f/taskiq_redis-1.1.0.tar.gz", hash = "sha256:c10bac567cce0b83caa22640da54305d6ddb74ad1324e638367e1b2bd46f682d", size = 15770 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/24/f9/0622ce1c5100b008b34cf7719a576491c1a0f22f50f8db090d797d8de6dc/taskiq_redis-1.1.0-py3-none-any.whl", hash = "sha256:516abe3cd703a7d97a5c0979102082e295d6cf2396a43b1c572382798df221cd", size = 20095 }, +] + [[package]] name = "tenacity" version = "9.1.2" @@ -4255,15 +4132,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/13/5d/1f15b252890c968d42b348d1e9b0aa12d5bf3e776704178ec37cceccdb63/vcrpy-7.0.0-py2.py3-none-any.whl", hash = "sha256:55791e26c18daa363435054d8b35bd41a4ac441b6676167635d1b37a71dbe124", size = 42321 }, ] -[[package]] -name = "vine" -version = "5.1.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/bd/e4/d07b5f29d283596b9727dd5275ccbceb63c44a1a82aa9e4bfd20426762ac/vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0", size = 48980 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/03/ff/7c0c86c43b3cbb927e0ccc0255cb4057ceba4799cd44ae95174ce8e8b5b2/vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc", size = 9636 }, -] - [[package]] name = "watchfiles" version = "1.1.0" @@ -4305,15 +4173,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bd/d3/254cea30f918f489db09d6a8435a7de7047f8cb68584477a515f160541d6/watchfiles-1.1.0-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:923fec6e5461c42bd7e3fd5ec37492c6f3468be0499bc0707b4bbbc16ac21792", size = 454009 }, ] -[[package]] -name = "wcwidth" -version = "0.2.13" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/6c/63/53559446a878410fc5a5974feb13d31d78d752eb18aeba59c7fef1af7598/wcwidth-0.2.13.tar.gz", hash = "sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5", size = 101301 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859", size = 34166 }, -] - [[package]] name = "websockets" version = "15.0.1" @@ -4444,3 +4303,12 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/eb/83/5d9092950565481b413b31a23e75dd3418ff0a277d6e0abf3729d4d1ce25/yarl-1.20.1-cp312-cp312-win_amd64.whl", hash = "sha256:48ea7d7f9be0487339828a4de0360d7ce0efc06524a48e1810f945c45b813698", size = 86710 }, { url = "https://files.pythonhosted.org/packages/b4/2d/2345fce04cfd4bee161bf1e7d9cdc702e3e16109021035dbb24db654a622/yarl-1.20.1-py3-none-any.whl", hash = "sha256:83b8eb083fe4683c6115795d9fc1cfaf2cbbefb19b3a1cb68f6527460f483a77", size = 46542 }, ] + +[[package]] +name = "zipp" +version = "3.23.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e3/02/0f2892c661036d50ede074e376733dca2ae7c6eb617489437771209d4180/zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166", size = 25547 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276 }, +]