Files
reflector/server/migration.md

17 KiB

Celery to TaskIQ Migration Guide

Executive Summary

This document outlines the migration path from Celery to TaskIQ for the Reflector project. TaskIQ is a modern, async-first distributed task queue that provides similar functionality to Celery while being designed specifically for async Python applications.

Current Celery Usage Analysis

Key Patterns in Use

  1. Task Decorators: @shared_task, @asynctask, @with_session decorators
  2. Task Invocation: .delay(), .si() for signatures
  3. Workflow Patterns: chain(), group(), chord() for complex pipelines
  4. Scheduled Tasks: Celery Beat with crontab and periodic schedules
  5. Session Management: Custom @with_session and @with_session_and_transcript decorators
  6. Retry Logic: Auto-retry with exponential backoff
  7. Redis Backend: Using Redis for broker and result backend

Critical Files to Migrate

  • reflector/worker/app.py - Celery app configuration and beat schedule
  • reflector/worker/session_decorator.py - Session management decorators
  • reflector/pipelines/main_file_pipeline.py - File processing pipeline
  • reflector/pipelines/main_live_pipeline.py - Live streaming pipeline (10 tasks)
  • reflector/worker/process.py - Background processing tasks
  • reflector/worker/ics_sync.py - Calendar sync tasks
  • reflector/worker/cleanup.py - Cleanup tasks
  • reflector/worker/webhook.py - Webhook notifications

TaskIQ Architecture Mapping

1. Installation

# Remove Celery dependencies
uv remove celery flower

# Install TaskIQ with Redis support
uv add taskiq taskiq-redis taskiq-pipelines

2. Broker Configuration

Current (Celery)

# reflector/worker/app.py
from celery import Celery

app = Celery(
    "reflector",
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND,
    include=[...],
)

New (TaskIQ)

# reflector/worker/broker.py
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
from taskiq import PipelineMiddleware, SimpleRetryMiddleware

result_backend = RedisAsyncResultBackend(
    redis_url=settings.REDIS_URL,
    result_ex_time=86400,  # 24 hours
)

broker = RedisStreamBroker(
    url=settings.REDIS_URL,
    max_connection_pool_size=10,
).with_result_backend(result_backend).with_middlewares(
    PipelineMiddleware(),  # For chain/group/chord support
    SimpleRetryMiddleware(default_retry_count=3),
)

# For testing environment
if os.environ.get("ENVIRONMENT") == "pytest":
    from taskiq import InMemoryBroker
    broker = InMemoryBroker(await_inplace=True)

3. Task Definition Migration

Current (Celery)

@shared_task
@asynctask
@with_session
async def task_pipeline_file_process(session: AsyncSession, transcript_id: str):
    pipeline = PipelineMainFile(transcript_id=transcript_id)
    await pipeline.process()

New (TaskIQ)

from taskiq import TaskiqDepends
from reflector.worker.broker import broker
from reflector.worker.dependencies import get_db_session

@broker.task
async def task_pipeline_file_process(transcript_id: str):
    # Use get_session for proper test mocking
    async for session in get_session():
        pipeline = PipelineMainFile(transcript_id=transcript_id)
        await pipeline.process()

4. Session Management

Current Session Decorators (Keep Using These!)

# reflector/worker/session_decorator.py
def with_session(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        async with get_session_context() as session:
            return await func(session, *args, **kwargs)
    return wrapper

Session Management Strategy

⚠️ CRITICAL: The key insight is to maintain consistent session management patterns:

  1. For Worker Tasks: Continue using @with_session decorator pattern
  2. For FastAPI endpoints: Use get_session dependency injection
  3. Never use get_session_factory() directly in application code
# APPROACH 1: Simple migration keeping decorator pattern
from reflector.worker.session_decorator import with_session

@taskiq_broker.task
@with_session
async def task_pipeline_file_process(session, *, transcript_id: str):
    # Session is provided by decorator, just like Celery version
    transcript = await transcripts_controller.get_by_id(session, transcript_id)
    pipeline = PipelineMainFile(transcript_id=transcript_id)
    await pipeline.process()

# APPROACH 2: For test compatibility without decorator
from reflector.db import get_session

@taskiq_broker.task
async def task_pipeline_file_process(transcript_id: str):
    # Use get_session which is mocked in tests
    async for session in get_session():
        transcript = await transcripts_controller.get_by_id(session, transcript_id)
        pipeline = PipelineMainFile(transcript_id=transcript_id)
        await pipeline.process()

# APPROACH 3: Future - TaskIQ dependency injection (after full migration)
from taskiq import TaskiqDepends

async def get_session_context():
    """Context manager version of get_session for consistency"""
    async for session in get_session():
        yield session

@taskiq_broker.task
async def task_pipeline_file_process(
    transcript_id: str,
    session: AsyncSession = TaskiqDepends(get_session_context)
):
    transcript = await transcripts_controller.get_by_id(session, transcript_id)
    pipeline = PipelineMainFile(transcript_id=transcript_id)
    await pipeline.process()

Key Points:

  • @with_session decorator works with TaskIQ tasks (remove @asynctask, keep @with_session)
  • For testing: get_session() from reflector.db is properly mocked
  • Never call get_session_factory() directly - always use the abstractions

5. Task Invocation

Current (Celery)

# Simple async execution
task_pipeline_file_process.delay(transcript_id=transcript.id)

# With signature for chaining
task_cleanup_consent.si(transcript_id=transcript_id)

New (TaskIQ)

# Simple async execution
await task_pipeline_file_process.kiq(transcript_id=transcript.id)

# With kicker for advanced configuration
await task_cleanup_consent.kicker().with_labels(
    priority="high"
).kiq(transcript_id=transcript_id)

6. Workflow Patterns (Chain, Group, Chord)

Current (Celery)

from celery import chain, group, chord

# Chain example
post_chain = chain(
    task_cleanup_consent.si(transcript_id=transcript_id),
    task_pipeline_post_to_zulip.si(transcript_id=transcript_id),
    task_send_webhook_if_needed.si(transcript_id=transcript_id),
)

# Chord example (parallel + callback)
chain = chord(
    group(chain_mp3_and_diarize, chain_title_preview),
    chain_final_summaries,
) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id)

New (TaskIQ with Pipelines)

from taskiq_pipelines import Pipeline
from taskiq import gather

# Chain example using Pipeline
post_pipeline = (
    Pipeline(broker, task_cleanup_consent)
    .call_next(task_pipeline_post_to_zulip, transcript_id=transcript_id)
    .call_next(task_send_webhook_if_needed, transcript_id=transcript_id)
)
await post_pipeline.kiq(transcript_id=transcript_id)

# Parallel execution with gather
results = await gather([
    chain_mp3_and_diarize.kiq(transcript_id),
    chain_title_preview.kiq(transcript_id),
])

# Then execute callback
await chain_final_summaries.kiq(transcript_id, results)
await task_pipeline_post_to_zulip.kiq(transcript_id)

7. Scheduled Tasks (Celery Beat → TaskIQ Scheduler)

Current (Celery Beat)

# reflector/worker/app.py
app.conf.beat_schedule = {
    "process_messages": {
        "task": "reflector.worker.process.process_messages",
        "schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS),
    },
    "reprocess_failed_recordings": {
        "task": "reflector.worker.process.reprocess_failed_recordings",
        "schedule": crontab(hour=5, minute=0),
    },
}

New (TaskIQ Scheduler)

# reflector/worker/scheduler.py
from taskiq import TaskiqScheduler
from taskiq_redis import ListRedisScheduleSource

schedule_source = ListRedisScheduleSource(settings.REDIS_URL)

# Define scheduled tasks with decorators
@broker.task(
    schedule=[
        {
            "cron": f"*/{int(settings.SQS_POLLING_TIMEOUT_SECONDS)} * * * * *"
        }
    ]
)
async def process_messages():
    # Task implementation
    pass

@broker.task(
    schedule=[{"cron": "0 5 * * *"}]  # Daily at 5 AM
)
async def reprocess_failed_recordings():
    # Task implementation
    pass

# Initialize scheduler
scheduler = TaskiqScheduler(broker, sources=[schedule_source])

# Run scheduler (separate process)
# taskiq scheduler reflector.worker.scheduler:scheduler

8. Retry Configuration

Current (Celery)

@shared_task(
    bind=True,
    max_retries=30,
    default_retry_delay=60,
    retry_backoff=True,
    retry_backoff_max=3600,
)
async def task_send_webhook_if_needed(self, ...):
    try:
        # Task logic
    except Exception as exc:
        raise self.retry(exc=exc)

New (TaskIQ)

from taskiq.middlewares import SimpleRetryMiddleware

# Global middleware configuration (1:1 with Celery defaults)
broker = broker.with_middlewares(
    SimpleRetryMiddleware(default_retry_count=3),
)

# For specific tasks with custom retry logic:
@broker.task(retry_on_error=True, max_retries=30)
async def task_send_webhook_if_needed(...):
    # Task logic - exceptions auto-retry
    pass

Testing Migration

Current Pytest Setup (Celery)

# tests/conftest.py
@pytest.fixture(scope="session")
def celery_config():
    return {
        "broker_url": "memory://",
        "result_backend": "cache+memory://",
    }

@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
async def test_task():
    pass

New Pytest Setup (TaskIQ)

# tests/conftest.py
import pytest
from taskiq import InMemoryBroker
from reflector.worker.broker import broker

@pytest.fixture(scope="function", autouse=True)
async def setup_taskiq_broker():
    """Replace broker with InMemoryBroker for testing"""
    original_broker = broker
    test_broker = InMemoryBroker(await_inplace=True)

    # Copy task registrations
    for task_name, task in original_broker._tasks.items():
        test_broker.register_task(task.original_function, task_name=task_name)

    yield test_broker
    await test_broker.shutdown()

@pytest.fixture
async def taskiq_with_db_session(db_session):
    """Setup TaskIQ with database session"""
    from reflector.worker.broker import broker
    broker.add_dependency_context({
        AsyncSession: db_session
    })
    yield
    broker.custom_dependency_context = {}

# Test example
@pytest.mark.anyio
async def test_task(taskiq_with_db_session):
    result = await task_pipeline_file_process("transcript-id")
    assert result is not None

Migration Steps

Phase 1: Setup (Week 1)

  1. Install TaskIQ packages

    uv add taskiq taskiq-redis taskiq-pipelines
    
  2. Create new broker configuration

    • Create reflector/worker/broker.py with TaskIQ broker setup
    • Create reflector/worker/dependencies.py for dependency injection
  3. Update settings

    • Keep existing Redis configuration
    • Add TaskIQ-specific settings if needed

Phase 2: Parallel Running (Week 2-3)

  1. Migrate simple tasks first

    • Start with cleanup.py (1 task)
    • Move to webhook.py (1 task)
    • Test thoroughly in isolation
  2. Setup dual-mode operation

    • Keep Celery tasks running
    • Add TaskIQ versions alongside
    • Use feature flags to switch between them

Phase 3: Complex Tasks (Week 3-4)

  1. Migrate pipeline tasks

    • Convert main_file_pipeline.py
    • Convert main_live_pipeline.py (most complex with 10 tasks)
    • Ensure chain/group/chord patterns work
  2. Migrate scheduled tasks

    • Setup TaskIQ scheduler
    • Convert beat schedule to TaskIQ schedules
    • Test cron patterns

Phase 4: Testing & Validation (Week 4-5)

  1. Update test suite

    • Replace Celery fixtures with TaskIQ fixtures
    • Update all test files
    • Ensure coverage remains the same
  2. Performance testing

    • Compare task execution times
    • Monitor Redis memory usage
    • Test under load

Phase 5: Cutover (Week 5-6)

  1. Final migration

    • Remove Celery dependencies
    • Update deployment scripts
    • Update documentation
  2. Monitoring

    • Setup TaskIQ monitoring (if available)
    • Create health checks
    • Document operational procedures

Key Differences to Note

Advantages of TaskIQ

  1. Native async support - No need for @asynctask wrapper
  2. Dependency injection - Cleaner than decorators for session management
  3. Type hints - Better IDE support and autocompletion
  4. Modern Python - Designed for Python 3.7+
  5. Simpler testing - InMemoryBroker makes testing easier

Potential Challenges

  1. Less mature ecosystem - Fewer third-party integrations
  2. Documentation - Less comprehensive than Celery
  3. Monitoring tools - No Flower equivalent (may need custom solution)
  4. Community support - Smaller community than Celery

Command Line Changes

Current (Celery)

# Start worker
celery -A reflector.worker.app worker --loglevel=info

# Start beat scheduler
celery -A reflector.worker.app beat

New (TaskIQ)

# Start worker
taskiq worker reflector.worker.broker:broker

# Start scheduler
taskiq scheduler reflector.worker.scheduler:scheduler

# With custom settings
taskiq worker reflector.worker.broker:broker --workers 4 --log-level INFO

Rollback Plan

If issues arise during migration:

  1. Keep Celery code in version control - Tag the last Celery version
  2. Maintain dual broker setup - Can switch back via environment variable
  3. Database compatibility - No schema changes required
  4. Redis compatibility - Both use Redis, easy to switch back

Success Criteria

  1. All tasks migrated and functioning
  2. Test coverage maintained at current levels
  3. Performance equal or better than Celery
  4. Scheduled tasks running reliably
  5. Error handling and retries working correctly
  6. WebSocket notifications still functioning
  7. Pipeline processing maintaining same behavior

Monitoring & Operations

Health Checks

# reflector/worker/healthcheck.py
@broker.task
async def healthcheck_ping():
    """TaskIQ health check task"""
    return {"status": "healthy", "timestamp": datetime.now()}

Metrics Collection

  • Task execution times
  • Success/failure rates
  • Queue depths
  • Worker utilization

Key Implementation Points - MUST READ

Critical Changes Required

  1. Session Management in Tasks

    • VERIFIED: Tasks MUST use get_session() from reflector.db for test compatibility
    • Do NOT use get_session_factory() directly in tasks - it bypasses test mocks
    • The test database session IS properly shared when using get_session()
  2. Task Invocation Changes

    • Replace .delay() with await .kiq()
    • All task invocations become async/await
    • No need to commit sessions before task invocation (controllers handle this)
  3. Broker Configuration

    • TaskIQ broker must be initialized in worker/app.py
    • Use InMemoryBroker(await_inplace=True) for testing
    • Use RedisStreamBroker for production
  4. Test Setup Requirements

    • Set os.environ["ENVIRONMENT"] = "pytest" at top of test files
    • Add TaskIQ broker fixture to test functions
    • Keep Celery fixtures for now (dual-mode operation)
  5. Import Pattern Changes

    # Each file needs both imports during migration
    from reflector.pipelines.main_file_pipeline import (
        task_pipeline_file_process,        # Celery version
        task_pipeline_file_process_taskiq, # TaskIQ version
    )
    
  6. Decorator Changes

    • Remove @asynctask - TaskIQ is async-native
    • Keep @with_session - it works with TaskIQ tasks!
    • Remove @shared_task from TaskIQ version
    • Keep @shared_task on Celery version for backward compatibility

Verified POC Results

Database transactions work correctly across test and TaskIQ tasks Tasks execute immediately in tests with InMemoryBroker(await_inplace=True) Session mocking works when using get_session() properly "OK" output confirmed - TaskIQ task executes and accesses test data

Conclusion

The migration from Celery to TaskIQ is feasible and offers several advantages for an async-first codebase like Reflector. The key challenges will be:

  1. Migrating complex pipeline patterns (chain/chord)
  2. Ensuring scheduled task reliability
  3. SOLVED: Maintaining session management patterns - use get_session()
  4. Updating the test suite

The phased approach allows for gradual migration with minimal risk. The ability to run both systems in parallel provides a safety net during the transition period.

Appendix: Quick Reference

Celery TaskIQ
@shared_task @broker.task
.delay() .kiq()
.apply_async() .kicker().kiq()
chain() Pipeline()
group() gather()
chord() gather() + callback
@task.retry() retry_on_error=True
Celery Beat TaskIQ Scheduler
celery worker taskiq worker
Flower Custom monitoring needed