mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
Compare commits
3 Commits
mathieu/sq
...
mathieu/sq
| Author | SHA1 | Date | |
|---|---|---|---|
| 8fcfac80fa | |||
| 9b3da4b2c8 | |||
| d86dc59bf2 |
583
server/migration.md
Normal file
583
server/migration.md
Normal file
@@ -0,0 +1,583 @@
|
||||
# Celery to TaskIQ Migration Guide
|
||||
|
||||
## Executive Summary
|
||||
|
||||
This document outlines the migration path from Celery to TaskIQ for the Reflector project. TaskIQ is a modern, async-first distributed task queue that provides similar functionality to Celery while being designed specifically for async Python applications.
|
||||
|
||||
## Current Celery Usage Analysis
|
||||
|
||||
### Key Patterns in Use
|
||||
1. **Task Decorators**: `@shared_task`, `@asynctask`, `@with_session` decorators
|
||||
2. **Task Invocation**: `.delay()`, `.si()` for signatures
|
||||
3. **Workflow Patterns**: `chain()`, `group()`, `chord()` for complex pipelines
|
||||
4. **Scheduled Tasks**: Celery Beat with crontab and periodic schedules
|
||||
5. **Session Management**: Custom `@with_session` and `@with_session_and_transcript` decorators
|
||||
6. **Retry Logic**: Auto-retry with exponential backoff
|
||||
7. **Redis Backend**: Using Redis for broker and result backend
|
||||
|
||||
### Critical Files to Migrate
|
||||
- `reflector/worker/app.py` - Celery app configuration and beat schedule
|
||||
- `reflector/worker/session_decorator.py` - Session management decorators
|
||||
- `reflector/pipelines/main_file_pipeline.py` - File processing pipeline
|
||||
- `reflector/pipelines/main_live_pipeline.py` - Live streaming pipeline (10 tasks)
|
||||
- `reflector/worker/process.py` - Background processing tasks
|
||||
- `reflector/worker/ics_sync.py` - Calendar sync tasks
|
||||
- `reflector/worker/cleanup.py` - Cleanup tasks
|
||||
- `reflector/worker/webhook.py` - Webhook notifications
|
||||
|
||||
## TaskIQ Architecture Mapping
|
||||
|
||||
### 1. Installation
|
||||
|
||||
```bash
|
||||
# Remove Celery dependencies
|
||||
uv remove celery flower
|
||||
|
||||
# Install TaskIQ with Redis support
|
||||
uv add taskiq taskiq-redis taskiq-pipelines
|
||||
```
|
||||
|
||||
### 2. Broker Configuration
|
||||
|
||||
#### Current (Celery)
|
||||
```python
|
||||
# reflector/worker/app.py
|
||||
from celery import Celery
|
||||
|
||||
app = Celery(
|
||||
"reflector",
|
||||
broker=settings.CELERY_BROKER_URL,
|
||||
backend=settings.CELERY_RESULT_BACKEND,
|
||||
include=[...],
|
||||
)
|
||||
```
|
||||
|
||||
#### New (TaskIQ)
|
||||
```python
|
||||
# reflector/worker/broker.py
|
||||
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
|
||||
from taskiq import PipelineMiddleware, SimpleRetryMiddleware
|
||||
|
||||
result_backend = RedisAsyncResultBackend(
|
||||
redis_url=settings.REDIS_URL,
|
||||
result_ex_time=86400, # 24 hours
|
||||
)
|
||||
|
||||
broker = RedisStreamBroker(
|
||||
url=settings.REDIS_URL,
|
||||
max_connection_pool_size=10,
|
||||
).with_result_backend(result_backend).with_middlewares(
|
||||
PipelineMiddleware(), # For chain/group/chord support
|
||||
SimpleRetryMiddleware(default_retry_count=3),
|
||||
)
|
||||
|
||||
# For testing environment
|
||||
if os.environ.get("ENVIRONMENT") == "pytest":
|
||||
from taskiq import InMemoryBroker
|
||||
broker = InMemoryBroker(await_inplace=True)
|
||||
```
|
||||
|
||||
### 3. Task Definition Migration
|
||||
|
||||
#### Current (Celery)
|
||||
```python
|
||||
@shared_task
|
||||
@asynctask
|
||||
@with_session
|
||||
async def task_pipeline_file_process(session: AsyncSession, transcript_id: str):
|
||||
pipeline = PipelineMainFile(transcript_id=transcript_id)
|
||||
await pipeline.process()
|
||||
```
|
||||
|
||||
#### New (TaskIQ)
|
||||
```python
|
||||
from taskiq import TaskiqDepends
|
||||
from reflector.worker.broker import broker
|
||||
from reflector.worker.dependencies import get_db_session
|
||||
|
||||
@broker.task
|
||||
async def task_pipeline_file_process(transcript_id: str):
|
||||
# Use get_session for proper test mocking
|
||||
async for session in get_session():
|
||||
pipeline = PipelineMainFile(transcript_id=transcript_id)
|
||||
await pipeline.process()
|
||||
```
|
||||
|
||||
### 4. Session Management
|
||||
|
||||
#### Current Session Decorators (Keep Using These!)
|
||||
```python
|
||||
# reflector/worker/session_decorator.py
|
||||
def with_session(func):
|
||||
@functools.wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
async with get_session_context() as session:
|
||||
return await func(session, *args, **kwargs)
|
||||
return wrapper
|
||||
```
|
||||
|
||||
#### Session Management Strategy
|
||||
|
||||
**⚠️ CRITICAL**: The key insight is to maintain consistent session management patterns:
|
||||
|
||||
1. **For Worker Tasks**: Continue using `@with_session` decorator pattern
|
||||
2. **For FastAPI endpoints**: Use `get_session` dependency injection
|
||||
3. **Never use `get_session_factory()` directly** in application code
|
||||
|
||||
```python
|
||||
# APPROACH 1: Simple migration keeping decorator pattern
|
||||
from reflector.worker.session_decorator import with_session
|
||||
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def task_pipeline_file_process(session, *, transcript_id: str):
|
||||
# Session is provided by decorator, just like Celery version
|
||||
transcript = await transcripts_controller.get_by_id(session, transcript_id)
|
||||
pipeline = PipelineMainFile(transcript_id=transcript_id)
|
||||
await pipeline.process()
|
||||
|
||||
# APPROACH 2: For test compatibility without decorator
|
||||
from reflector.db import get_session
|
||||
|
||||
@taskiq_broker.task
|
||||
async def task_pipeline_file_process(transcript_id: str):
|
||||
# Use get_session which is mocked in tests
|
||||
async for session in get_session():
|
||||
transcript = await transcripts_controller.get_by_id(session, transcript_id)
|
||||
pipeline = PipelineMainFile(transcript_id=transcript_id)
|
||||
await pipeline.process()
|
||||
|
||||
# APPROACH 3: Future - TaskIQ dependency injection (after full migration)
|
||||
from taskiq import TaskiqDepends
|
||||
|
||||
async def get_session_context():
|
||||
"""Context manager version of get_session for consistency"""
|
||||
async for session in get_session():
|
||||
yield session
|
||||
|
||||
@taskiq_broker.task
|
||||
async def task_pipeline_file_process(
|
||||
transcript_id: str,
|
||||
session: AsyncSession = TaskiqDepends(get_session_context)
|
||||
):
|
||||
transcript = await transcripts_controller.get_by_id(session, transcript_id)
|
||||
pipeline = PipelineMainFile(transcript_id=transcript_id)
|
||||
await pipeline.process()
|
||||
```
|
||||
|
||||
**Key Points:**
|
||||
- `@with_session` decorator works with TaskIQ tasks (remove `@asynctask`, keep `@with_session`)
|
||||
- For testing: `get_session()` from `reflector.db` is properly mocked
|
||||
- Never call `get_session_factory()` directly - always use the abstractions
|
||||
|
||||
### 5. Task Invocation
|
||||
|
||||
#### Current (Celery)
|
||||
```python
|
||||
# Simple async execution
|
||||
task_pipeline_file_process.delay(transcript_id=transcript.id)
|
||||
|
||||
# With signature for chaining
|
||||
task_cleanup_consent.si(transcript_id=transcript_id)
|
||||
```
|
||||
|
||||
#### New (TaskIQ)
|
||||
```python
|
||||
# Simple async execution
|
||||
await task_pipeline_file_process.kiq(transcript_id=transcript.id)
|
||||
|
||||
# With kicker for advanced configuration
|
||||
await task_cleanup_consent.kicker().with_labels(
|
||||
priority="high"
|
||||
).kiq(transcript_id=transcript_id)
|
||||
```
|
||||
|
||||
### 6. Workflow Patterns (Chain, Group, Chord)
|
||||
|
||||
#### Current (Celery)
|
||||
```python
|
||||
from celery import chain, group, chord
|
||||
|
||||
# Chain example
|
||||
post_chain = chain(
|
||||
task_cleanup_consent.si(transcript_id=transcript_id),
|
||||
task_pipeline_post_to_zulip.si(transcript_id=transcript_id),
|
||||
task_send_webhook_if_needed.si(transcript_id=transcript_id),
|
||||
)
|
||||
|
||||
# Chord example (parallel + callback)
|
||||
chain = chord(
|
||||
group(chain_mp3_and_diarize, chain_title_preview),
|
||||
chain_final_summaries,
|
||||
) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id)
|
||||
```
|
||||
|
||||
#### New (TaskIQ with Pipelines)
|
||||
```python
|
||||
from taskiq_pipelines import Pipeline
|
||||
from taskiq import gather
|
||||
|
||||
# Chain example using Pipeline
|
||||
post_pipeline = (
|
||||
Pipeline(broker, task_cleanup_consent)
|
||||
.call_next(task_pipeline_post_to_zulip, transcript_id=transcript_id)
|
||||
.call_next(task_send_webhook_if_needed, transcript_id=transcript_id)
|
||||
)
|
||||
await post_pipeline.kiq(transcript_id=transcript_id)
|
||||
|
||||
# Parallel execution with gather
|
||||
results = await gather([
|
||||
chain_mp3_and_diarize.kiq(transcript_id),
|
||||
chain_title_preview.kiq(transcript_id),
|
||||
])
|
||||
|
||||
# Then execute callback
|
||||
await chain_final_summaries.kiq(transcript_id, results)
|
||||
await task_pipeline_post_to_zulip.kiq(transcript_id)
|
||||
```
|
||||
|
||||
### 7. Scheduled Tasks (Celery Beat → TaskIQ Scheduler)
|
||||
|
||||
#### Current (Celery Beat)
|
||||
```python
|
||||
# reflector/worker/app.py
|
||||
app.conf.beat_schedule = {
|
||||
"process_messages": {
|
||||
"task": "reflector.worker.process.process_messages",
|
||||
"schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS),
|
||||
},
|
||||
"reprocess_failed_recordings": {
|
||||
"task": "reflector.worker.process.reprocess_failed_recordings",
|
||||
"schedule": crontab(hour=5, minute=0),
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
#### New (TaskIQ Scheduler)
|
||||
```python
|
||||
# reflector/worker/scheduler.py
|
||||
from taskiq import TaskiqScheduler
|
||||
from taskiq_redis import ListRedisScheduleSource
|
||||
|
||||
schedule_source = ListRedisScheduleSource(settings.REDIS_URL)
|
||||
|
||||
# Define scheduled tasks with decorators
|
||||
@broker.task(
|
||||
schedule=[
|
||||
{
|
||||
"cron": f"*/{int(settings.SQS_POLLING_TIMEOUT_SECONDS)} * * * * *"
|
||||
}
|
||||
]
|
||||
)
|
||||
async def process_messages():
|
||||
# Task implementation
|
||||
pass
|
||||
|
||||
@broker.task(
|
||||
schedule=[{"cron": "0 5 * * *"}] # Daily at 5 AM
|
||||
)
|
||||
async def reprocess_failed_recordings():
|
||||
# Task implementation
|
||||
pass
|
||||
|
||||
# Initialize scheduler
|
||||
scheduler = TaskiqScheduler(broker, sources=[schedule_source])
|
||||
|
||||
# Run scheduler (separate process)
|
||||
# taskiq scheduler reflector.worker.scheduler:scheduler
|
||||
```
|
||||
|
||||
### 8. Retry Configuration
|
||||
|
||||
#### Current (Celery)
|
||||
```python
|
||||
@shared_task(
|
||||
bind=True,
|
||||
max_retries=30,
|
||||
default_retry_delay=60,
|
||||
retry_backoff=True,
|
||||
retry_backoff_max=3600,
|
||||
)
|
||||
async def task_send_webhook_if_needed(self, ...):
|
||||
try:
|
||||
# Task logic
|
||||
except Exception as exc:
|
||||
raise self.retry(exc=exc)
|
||||
```
|
||||
|
||||
#### New (TaskIQ)
|
||||
```python
|
||||
from taskiq.middlewares import SimpleRetryMiddleware
|
||||
|
||||
# Global middleware configuration (1:1 with Celery defaults)
|
||||
broker = broker.with_middlewares(
|
||||
SimpleRetryMiddleware(default_retry_count=3),
|
||||
)
|
||||
|
||||
# For specific tasks with custom retry logic:
|
||||
@broker.task(retry_on_error=True, max_retries=30)
|
||||
async def task_send_webhook_if_needed(...):
|
||||
# Task logic - exceptions auto-retry
|
||||
pass
|
||||
```
|
||||
|
||||
## Testing Migration
|
||||
|
||||
### Current Pytest Setup (Celery)
|
||||
```python
|
||||
# tests/conftest.py
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_config():
|
||||
return {
|
||||
"broker_url": "memory://",
|
||||
"result_backend": "cache+memory://",
|
||||
}
|
||||
|
||||
@pytest.mark.usefixtures("celery_session_app")
|
||||
@pytest.mark.usefixtures("celery_session_worker")
|
||||
async def test_task():
|
||||
pass
|
||||
```
|
||||
|
||||
### New Pytest Setup (TaskIQ)
|
||||
```python
|
||||
# tests/conftest.py
|
||||
import pytest
|
||||
from taskiq import InMemoryBroker
|
||||
from reflector.worker.broker import broker
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
async def setup_taskiq_broker():
|
||||
"""Replace broker with InMemoryBroker for testing"""
|
||||
original_broker = broker
|
||||
test_broker = InMemoryBroker(await_inplace=True)
|
||||
|
||||
# Copy task registrations
|
||||
for task_name, task in original_broker._tasks.items():
|
||||
test_broker.register_task(task.original_function, task_name=task_name)
|
||||
|
||||
yield test_broker
|
||||
await test_broker.shutdown()
|
||||
|
||||
@pytest.fixture
|
||||
async def taskiq_with_db_session(db_session):
|
||||
"""Setup TaskIQ with database session"""
|
||||
from reflector.worker.broker import broker
|
||||
broker.add_dependency_context({
|
||||
AsyncSession: db_session
|
||||
})
|
||||
yield
|
||||
broker.custom_dependency_context = {}
|
||||
|
||||
# Test example
|
||||
@pytest.mark.anyio
|
||||
async def test_task(taskiq_with_db_session):
|
||||
result = await task_pipeline_file_process("transcript-id")
|
||||
assert result is not None
|
||||
```
|
||||
|
||||
## Migration Steps
|
||||
|
||||
### Phase 1: Setup (Week 1)
|
||||
1. **Install TaskIQ packages**
|
||||
```bash
|
||||
uv add taskiq taskiq-redis taskiq-pipelines
|
||||
```
|
||||
|
||||
2. **Create new broker configuration**
|
||||
- Create `reflector/worker/broker.py` with TaskIQ broker setup
|
||||
- Create `reflector/worker/dependencies.py` for dependency injection
|
||||
|
||||
3. **Update settings**
|
||||
- Keep existing Redis configuration
|
||||
- Add TaskIQ-specific settings if needed
|
||||
|
||||
### Phase 2: Parallel Running (Week 2-3)
|
||||
1. **Migrate simple tasks first**
|
||||
- Start with `cleanup.py` (1 task)
|
||||
- Move to `webhook.py` (1 task)
|
||||
- Test thoroughly in isolation
|
||||
|
||||
2. **Setup dual-mode operation**
|
||||
- Keep Celery tasks running
|
||||
- Add TaskIQ versions alongside
|
||||
- Use feature flags to switch between them
|
||||
|
||||
### Phase 3: Complex Tasks (Week 3-4)
|
||||
1. **Migrate pipeline tasks**
|
||||
- Convert `main_file_pipeline.py`
|
||||
- Convert `main_live_pipeline.py` (most complex with 10 tasks)
|
||||
- Ensure chain/group/chord patterns work
|
||||
|
||||
2. **Migrate scheduled tasks**
|
||||
- Setup TaskIQ scheduler
|
||||
- Convert beat schedule to TaskIQ schedules
|
||||
- Test cron patterns
|
||||
|
||||
### Phase 4: Testing & Validation (Week 4-5)
|
||||
1. **Update test suite**
|
||||
- Replace Celery fixtures with TaskIQ fixtures
|
||||
- Update all test files
|
||||
- Ensure coverage remains the same
|
||||
|
||||
2. **Performance testing**
|
||||
- Compare task execution times
|
||||
- Monitor Redis memory usage
|
||||
- Test under load
|
||||
|
||||
### Phase 5: Cutover (Week 5-6)
|
||||
1. **Final migration**
|
||||
- Remove Celery dependencies
|
||||
- Update deployment scripts
|
||||
- Update documentation
|
||||
|
||||
2. **Monitoring**
|
||||
- Setup TaskIQ monitoring (if available)
|
||||
- Create health checks
|
||||
- Document operational procedures
|
||||
|
||||
## Key Differences to Note
|
||||
|
||||
### Advantages of TaskIQ
|
||||
1. **Native async support** - No need for `@asynctask` wrapper
|
||||
2. **Dependency injection** - Cleaner than decorators for session management
|
||||
3. **Type hints** - Better IDE support and autocompletion
|
||||
4. **Modern Python** - Designed for Python 3.7+
|
||||
5. **Simpler testing** - InMemoryBroker makes testing easier
|
||||
|
||||
### Potential Challenges
|
||||
1. **Less mature ecosystem** - Fewer third-party integrations
|
||||
2. **Documentation** - Less comprehensive than Celery
|
||||
3. **Monitoring tools** - No Flower equivalent (may need custom solution)
|
||||
4. **Community support** - Smaller community than Celery
|
||||
|
||||
## Command Line Changes
|
||||
|
||||
### Current (Celery)
|
||||
```bash
|
||||
# Start worker
|
||||
celery -A reflector.worker.app worker --loglevel=info
|
||||
|
||||
# Start beat scheduler
|
||||
celery -A reflector.worker.app beat
|
||||
```
|
||||
|
||||
### New (TaskIQ)
|
||||
```bash
|
||||
# Start worker
|
||||
taskiq worker reflector.worker.broker:broker
|
||||
|
||||
# Start scheduler
|
||||
taskiq scheduler reflector.worker.scheduler:scheduler
|
||||
|
||||
# With custom settings
|
||||
taskiq worker reflector.worker.broker:broker --workers 4 --log-level INFO
|
||||
```
|
||||
|
||||
## Rollback Plan
|
||||
|
||||
If issues arise during migration:
|
||||
|
||||
1. **Keep Celery code in version control** - Tag the last Celery version
|
||||
2. **Maintain dual broker setup** - Can switch back via environment variable
|
||||
3. **Database compatibility** - No schema changes required
|
||||
4. **Redis compatibility** - Both use Redis, easy to switch back
|
||||
|
||||
## Success Criteria
|
||||
|
||||
1. ✅ All tasks migrated and functioning
|
||||
2. ✅ Test coverage maintained at current levels
|
||||
3. ✅ Performance equal or better than Celery
|
||||
4. ✅ Scheduled tasks running reliably
|
||||
5. ✅ Error handling and retries working correctly
|
||||
6. ✅ WebSocket notifications still functioning
|
||||
7. ✅ Pipeline processing maintaining same behavior
|
||||
|
||||
## Monitoring & Operations
|
||||
|
||||
### Health Checks
|
||||
```python
|
||||
# reflector/worker/healthcheck.py
|
||||
@broker.task
|
||||
async def healthcheck_ping():
|
||||
"""TaskIQ health check task"""
|
||||
return {"status": "healthy", "timestamp": datetime.now()}
|
||||
```
|
||||
|
||||
### Metrics Collection
|
||||
- Task execution times
|
||||
- Success/failure rates
|
||||
- Queue depths
|
||||
- Worker utilization
|
||||
|
||||
## Key Implementation Points - MUST READ
|
||||
|
||||
### Critical Changes Required
|
||||
|
||||
1. **Session Management in Tasks**
|
||||
- ✅ **VERIFIED**: Tasks MUST use `get_session()` from `reflector.db` for test compatibility
|
||||
- ❌ Do NOT use `get_session_factory()` directly in tasks - it bypasses test mocks
|
||||
- ✅ The test database session IS properly shared when using `get_session()`
|
||||
|
||||
2. **Task Invocation Changes**
|
||||
- Replace `.delay()` with `await .kiq()`
|
||||
- All task invocations become async/await
|
||||
- No need to commit sessions before task invocation (controllers handle this)
|
||||
|
||||
3. **Broker Configuration**
|
||||
- TaskIQ broker must be initialized in `worker/app.py`
|
||||
- Use `InMemoryBroker(await_inplace=True)` for testing
|
||||
- Use `RedisStreamBroker` for production
|
||||
|
||||
4. **Test Setup Requirements**
|
||||
- Set `os.environ["ENVIRONMENT"] = "pytest"` at top of test files
|
||||
- Add TaskIQ broker fixture to test functions
|
||||
- Keep Celery fixtures for now (dual-mode operation)
|
||||
|
||||
5. **Import Pattern Changes**
|
||||
```python
|
||||
# Each file needs both imports during migration
|
||||
from reflector.pipelines.main_file_pipeline import (
|
||||
task_pipeline_file_process, # Celery version
|
||||
task_pipeline_file_process_taskiq, # TaskIQ version
|
||||
)
|
||||
```
|
||||
|
||||
6. **Decorator Changes**
|
||||
- Remove `@asynctask` - TaskIQ is async-native
|
||||
- **Keep `@with_session`** - it works with TaskIQ tasks!
|
||||
- Remove `@shared_task` from TaskIQ version
|
||||
- Keep `@shared_task` on Celery version for backward compatibility
|
||||
|
||||
## Verified POC Results
|
||||
|
||||
✅ **Database transactions work correctly** across test and TaskIQ tasks
|
||||
✅ **Tasks execute immediately** in tests with `InMemoryBroker(await_inplace=True)`
|
||||
✅ **Session mocking works** when using `get_session()` properly
|
||||
✅ **"OK" output confirmed** - TaskIQ task executes and accesses test data
|
||||
|
||||
## Conclusion
|
||||
|
||||
The migration from Celery to TaskIQ is feasible and offers several advantages for an async-first codebase like Reflector. The key challenges will be:
|
||||
|
||||
1. Migrating complex pipeline patterns (chain/chord)
|
||||
2. Ensuring scheduled task reliability
|
||||
3. **SOLVED**: Maintaining session management patterns - use `get_session()`
|
||||
4. Updating the test suite
|
||||
|
||||
The phased approach allows for gradual migration with minimal risk. The ability to run both systems in parallel provides a safety net during the transition period.
|
||||
|
||||
## Appendix: Quick Reference
|
||||
|
||||
| Celery | TaskIQ |
|
||||
|--------|--------|
|
||||
| `@shared_task` | `@broker.task` |
|
||||
| `.delay()` | `.kiq()` |
|
||||
| `.apply_async()` | `.kicker().kiq()` |
|
||||
| `chain()` | `Pipeline()` |
|
||||
| `group()` | `gather()` |
|
||||
| `chord()` | `gather() + callback` |
|
||||
| `@task.retry()` | `retry_on_error=True` |
|
||||
| Celery Beat | TaskIQ Scheduler |
|
||||
| `celery worker` | `taskiq worker` |
|
||||
| Flower | Custom monitoring needed |
|
||||
@@ -23,14 +23,16 @@ def upgrade() -> None:
|
||||
op.drop_column("transcript", "search_vector_en")
|
||||
|
||||
# Recreate the search vector column with long_summary included
|
||||
op.execute("""
|
||||
op.execute(
|
||||
"""
|
||||
ALTER TABLE transcript ADD COLUMN search_vector_en tsvector
|
||||
GENERATED ALWAYS AS (
|
||||
setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
|
||||
setweight(to_tsvector('english', coalesce(long_summary, '')), 'B') ||
|
||||
setweight(to_tsvector('english', coalesce(webvtt, '')), 'C')
|
||||
) STORED
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
# Recreate the GIN index for the search vector
|
||||
op.create_index(
|
||||
@@ -47,13 +49,15 @@ def downgrade() -> None:
|
||||
op.drop_column("transcript", "search_vector_en")
|
||||
|
||||
# Recreate the original search vector column without long_summary
|
||||
op.execute("""
|
||||
op.execute(
|
||||
"""
|
||||
ALTER TABLE transcript ADD COLUMN search_vector_en tsvector
|
||||
GENERATED ALWAYS AS (
|
||||
setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
|
||||
setweight(to_tsvector('english', coalesce(webvtt, '')), 'B')
|
||||
) STORED
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
# Recreate the GIN index for the search vector
|
||||
op.create_index(
|
||||
|
||||
@@ -21,13 +21,15 @@ def upgrade() -> None:
|
||||
if conn.dialect.name != "postgresql":
|
||||
return
|
||||
|
||||
op.execute("""
|
||||
op.execute(
|
||||
"""
|
||||
ALTER TABLE transcript ADD COLUMN search_vector_en tsvector
|
||||
GENERATED ALWAYS AS (
|
||||
setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
|
||||
setweight(to_tsvector('english', coalesce(webvtt, '')), 'B')
|
||||
) STORED
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
op.create_index(
|
||||
"idx_transcript_search_vector_en",
|
||||
|
||||
@@ -19,12 +19,14 @@ depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
def upgrade() -> None:
|
||||
# Set room_id to NULL for meetings that reference non-existent rooms
|
||||
op.execute("""
|
||||
op.execute(
|
||||
"""
|
||||
UPDATE meeting
|
||||
SET room_id = NULL
|
||||
WHERE room_id IS NOT NULL
|
||||
AND room_id NOT IN (SELECT id FROM room WHERE id IS NOT NULL)
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
|
||||
@@ -27,7 +27,8 @@ def upgrade() -> None:
|
||||
|
||||
# Populate room_id for existing ROOM-type transcripts
|
||||
# This joins through recording -> meeting -> room to get the room_id
|
||||
op.execute("""
|
||||
op.execute(
|
||||
"""
|
||||
UPDATE transcript AS t
|
||||
SET room_id = r.id
|
||||
FROM recording rec
|
||||
@@ -36,11 +37,13 @@ def upgrade() -> None:
|
||||
WHERE t.recording_id = rec.id
|
||||
AND t.source_kind = 'room'
|
||||
AND t.room_id IS NULL
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
# Fix missing meeting_id for ROOM-type transcripts
|
||||
# The meeting_id field exists but was never populated
|
||||
op.execute("""
|
||||
op.execute(
|
||||
"""
|
||||
UPDATE transcript AS t
|
||||
SET meeting_id = rec.meeting_id
|
||||
FROM recording rec
|
||||
@@ -48,7 +51,8 @@ def upgrade() -> None:
|
||||
AND t.source_kind = 'room'
|
||||
AND t.meeting_id IS NULL
|
||||
AND rec.meeting_id IS NOT NULL
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
|
||||
@@ -26,7 +26,6 @@ dependencies = [
|
||||
"prometheus-fastapi-instrumentator>=6.1.0",
|
||||
"sentencepiece>=0.1.99",
|
||||
"protobuf>=4.24.3",
|
||||
"celery>=5.3.4",
|
||||
"redis>=5.0.1",
|
||||
"python-jose[cryptography]>=3.3.0",
|
||||
"python-multipart>=0.0.6",
|
||||
@@ -39,6 +38,8 @@ dependencies = [
|
||||
"pytest-env>=1.1.5",
|
||||
"webvtt-py>=0.5.0",
|
||||
"icalendar>=6.0.0",
|
||||
"taskiq>=0.11.18",
|
||||
"taskiq-redis>=1.1.0",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
@@ -55,7 +56,6 @@ tests = [
|
||||
"pytest>=7.4.0",
|
||||
"httpx-ws>=0.4.1",
|
||||
"pytest-httpx>=0.23.1",
|
||||
"pytest-celery>=0.0.0",
|
||||
"pytest-recording>=0.13.4",
|
||||
"pytest-docker>=3.2.3",
|
||||
"asgi-lifespan>=2.1.0",
|
||||
|
||||
@@ -88,8 +88,8 @@ app.include_router(zulip_router, prefix="/v1")
|
||||
app.include_router(whereby_router, prefix="/v1")
|
||||
add_pagination(app)
|
||||
|
||||
# prepare celery
|
||||
from reflector.worker import app as celery_app # noqa
|
||||
# prepare taskiq
|
||||
from reflector.worker import app as taskiq_app # noqa
|
||||
|
||||
|
||||
# simpler openapi id
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
import asyncio
|
||||
import functools
|
||||
|
||||
|
||||
def asynctask(f):
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
async def run_async():
|
||||
return await f(*args, **kwargs)
|
||||
|
||||
coro = run_async()
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = None
|
||||
if loop and loop.is_running():
|
||||
return loop.run_until_complete(coro)
|
||||
return asyncio.run(coro)
|
||||
|
||||
return wrapper
|
||||
@@ -1,3 +1,4 @@
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from sqlalchemy.ext.asyncio import (
|
||||
@@ -45,6 +46,18 @@ async def _get_session() -> AsyncGenerator[AsyncSession, None]:
|
||||
|
||||
|
||||
async def get_session() -> AsyncGenerator[AsyncSession, None]:
|
||||
"""
|
||||
Get a database session, fastapi dependency injection style
|
||||
"""
|
||||
async for session in _get_session():
|
||||
yield session
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_session_context():
|
||||
"""
|
||||
Get a database session as an async context manager
|
||||
"""
|
||||
async for session in _get_session():
|
||||
yield session
|
||||
|
||||
|
||||
@@ -171,9 +171,11 @@ class CalendarEventController:
|
||||
.where(
|
||||
sa.and_(
|
||||
CalendarEventModel.room_id == room_id,
|
||||
CalendarEventModel.ics_uid.notin_(current_ics_uids)
|
||||
if current_ics_uids
|
||||
else True,
|
||||
(
|
||||
CalendarEventModel.ics_uid.notin_(current_ics_uids)
|
||||
if current_ics_uids
|
||||
else True
|
||||
),
|
||||
CalendarEventModel.end_time > datetime.now(timezone.utc),
|
||||
)
|
||||
)
|
||||
|
||||
@@ -2,7 +2,6 @@ import enum
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
@@ -481,7 +480,12 @@ class TranscriptController:
|
||||
# TODO investigate why mutate= is used. it's used in one place currently, maybe because of ORM field updates.
|
||||
# using mutate=True is discouraged
|
||||
async def update(
|
||||
self, session: AsyncSession, transcript: Transcript, values: dict, mutate=False
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transcript: Transcript,
|
||||
values: dict,
|
||||
commit=True,
|
||||
mutate=False,
|
||||
) -> Transcript:
|
||||
"""
|
||||
Update a transcript fields with key/values in values.
|
||||
@@ -495,7 +499,8 @@ class TranscriptController:
|
||||
.values(**values)
|
||||
)
|
||||
await session.execute(query)
|
||||
await session.commit()
|
||||
if commit:
|
||||
await session.commit()
|
||||
if mutate:
|
||||
for key, value in values.items():
|
||||
setattr(transcript, key, value)
|
||||
@@ -585,26 +590,21 @@ class TranscriptController:
|
||||
await session.execute(query)
|
||||
await session.commit()
|
||||
|
||||
@asynccontextmanager
|
||||
async def transaction(self, session: AsyncSession):
|
||||
"""
|
||||
A context manager for database transaction
|
||||
"""
|
||||
async with session.begin():
|
||||
yield
|
||||
|
||||
async def append_event(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transcript: Transcript,
|
||||
event: str,
|
||||
data: Any,
|
||||
commit=True,
|
||||
) -> TranscriptEvent:
|
||||
"""
|
||||
Append an event to a transcript
|
||||
"""
|
||||
resp = transcript.add_event(event=event, data=data)
|
||||
await self.update(session, transcript, {"events": transcript.events_dump()})
|
||||
await self.update(
|
||||
session, transcript, {"events": transcript.events_dump()}, commit=commit
|
||||
)
|
||||
return resp
|
||||
|
||||
async def upsert_topic(
|
||||
@@ -699,19 +699,25 @@ class TranscriptController:
|
||||
|
||||
Will add an event STATUS + update the status field of transcript
|
||||
"""
|
||||
async with self.transaction(session):
|
||||
transcript = await self.get_by_id(session, transcript_id)
|
||||
if not transcript:
|
||||
raise Exception(f"Transcript {transcript_id} not found")
|
||||
if transcript.status == status:
|
||||
return
|
||||
resp = await self.append_event(
|
||||
session,
|
||||
transcript=transcript,
|
||||
event="STATUS",
|
||||
data=StrValue(value=status),
|
||||
)
|
||||
await self.update(session, transcript, {"status": status})
|
||||
transcript = await self.get_by_id(session, transcript_id)
|
||||
if not transcript:
|
||||
raise Exception(f"Transcript {transcript_id} not found")
|
||||
if transcript.status == status:
|
||||
return
|
||||
resp = await self.append_event(
|
||||
session,
|
||||
transcript=transcript,
|
||||
event="STATUS",
|
||||
data=StrValue(value=status),
|
||||
commit=False,
|
||||
)
|
||||
await self.update(
|
||||
session,
|
||||
transcript,
|
||||
{"status": status},
|
||||
commit=False,
|
||||
)
|
||||
await session.commit()
|
||||
return resp
|
||||
|
||||
|
||||
|
||||
@@ -12,11 +12,8 @@ from pathlib import Path
|
||||
|
||||
import av
|
||||
import structlog
|
||||
from celery import chain, shared_task
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from reflector.asynctask import asynctask
|
||||
from reflector.db import get_session_factory
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import (
|
||||
SourceKind,
|
||||
@@ -28,8 +25,8 @@ from reflector.logger import logger
|
||||
from reflector.pipelines.main_live_pipeline import (
|
||||
PipelineMainBase,
|
||||
broadcast_to_sockets,
|
||||
task_cleanup_consent,
|
||||
task_pipeline_post_to_zulip,
|
||||
task_cleanup_consent_taskiq,
|
||||
task_pipeline_post_to_zulip_taskiq,
|
||||
)
|
||||
from reflector.processors import (
|
||||
AudioFileWriterProcessor,
|
||||
@@ -55,8 +52,9 @@ from reflector.processors.types import (
|
||||
)
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_transcripts_storage
|
||||
from reflector.worker.session_decorator import with_session
|
||||
from reflector.worker.webhook import send_transcript_webhook
|
||||
from reflector.worker.app import taskiq_broker
|
||||
from reflector.worker.session_decorator import catch_exception, with_session
|
||||
from reflector.worker.webhook import send_transcript_webhook_taskiq
|
||||
|
||||
|
||||
class EmptyPipeline:
|
||||
@@ -98,31 +96,29 @@ class PipelineMainFile(PipelineMainBase):
|
||||
)
|
||||
|
||||
@broadcast_to_sockets
|
||||
async def set_status(self, transcript_id: str, status: TranscriptStatus):
|
||||
async with self.lock_transaction():
|
||||
async with get_session_factory()() as session:
|
||||
return await transcripts_controller.set_status(
|
||||
session, transcript_id, status
|
||||
)
|
||||
async def set_status(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transcript_id: str,
|
||||
status: TranscriptStatus,
|
||||
):
|
||||
return await transcripts_controller.set_status(session, transcript_id, status)
|
||||
|
||||
async def process(self, file_path: Path):
|
||||
async def process(self, session: AsyncSession, file_path: Path):
|
||||
"""Main entry point for file processing"""
|
||||
self.logger.info(f"Starting file pipeline for {file_path}")
|
||||
|
||||
async with get_session_factory()() as session:
|
||||
transcript = await transcripts_controller.get_by_id(
|
||||
session, self.transcript_id
|
||||
)
|
||||
transcript = await transcripts_controller.get_by_id(session, self.transcript_id)
|
||||
|
||||
# Clear transcript as we're going to regenerate everything
|
||||
await transcripts_controller.update(
|
||||
session,
|
||||
transcript,
|
||||
{
|
||||
"events": [],
|
||||
"topics": [],
|
||||
},
|
||||
)
|
||||
# Clear transcript as we're going to regenerate everything
|
||||
await transcripts_controller.update(
|
||||
session,
|
||||
transcript,
|
||||
{
|
||||
"events": [],
|
||||
"topics": [],
|
||||
},
|
||||
)
|
||||
|
||||
# Extract audio and write to transcript location
|
||||
audio_path = await self.extract_and_write_audio(file_path, transcript)
|
||||
@@ -141,8 +137,7 @@ class PipelineMainFile(PipelineMainBase):
|
||||
|
||||
self.logger.info("File pipeline complete")
|
||||
|
||||
async with get_session_factory()() as session:
|
||||
await transcripts_controller.set_status(session, transcript.id, "ended")
|
||||
await transcripts_controller.set_status(session, transcript.id, "ended")
|
||||
|
||||
async def extract_and_write_audio(
|
||||
self, file_path: Path, transcript: Transcript
|
||||
@@ -393,11 +388,9 @@ class PipelineMainFile(PipelineMainBase):
|
||||
await processor.flush()
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def task_send_webhook_if_needed(session, *, transcript_id: str):
|
||||
"""Send webhook if this is a room recording with webhook configured"""
|
||||
transcript = await transcripts_controller.get_by_id(session, transcript_id)
|
||||
if not transcript:
|
||||
return
|
||||
@@ -411,25 +404,23 @@ async def task_send_webhook_if_needed(session, *, transcript_id: str):
|
||||
room_id=room.id,
|
||||
webhook_url=room.webhook_url,
|
||||
)
|
||||
send_transcript_webhook.delay(
|
||||
await send_transcript_webhook_taskiq.kiq(
|
||||
transcript_id, room.id, event_id=uuid.uuid4().hex
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@catch_exception
|
||||
@with_session
|
||||
async def task_pipeline_file_process(session, *, transcript_id: str):
|
||||
"""Celery task for file pipeline processing"""
|
||||
async def task_pipeline_file_process(session: AsyncSession, *, transcript_id: str):
|
||||
transcript = await transcripts_controller.get_by_id(session, transcript_id)
|
||||
if not transcript:
|
||||
raise Exception(f"Transcript {transcript_id} not found")
|
||||
|
||||
pipeline = PipelineMainFile(transcript_id=transcript_id)
|
||||
try:
|
||||
await pipeline.set_status(transcript_id, "processing")
|
||||
await pipeline.set_status(session, transcript_id, "processing")
|
||||
|
||||
# Find the file to process
|
||||
audio_file = next(transcript.data_path.glob("upload.*"), None)
|
||||
if not audio_file:
|
||||
audio_file = next(transcript.data_path.glob("audio.*"), None)
|
||||
@@ -437,16 +428,18 @@ async def task_pipeline_file_process(session, *, transcript_id: str):
|
||||
if not audio_file:
|
||||
raise Exception("No audio file found to process")
|
||||
|
||||
await pipeline.process(audio_file)
|
||||
await pipeline.process(session, audio_file)
|
||||
|
||||
except Exception:
|
||||
await pipeline.set_status(transcript_id, "error")
|
||||
logger.error("Error while processing the file", exc_info=True)
|
||||
try:
|
||||
await pipeline.set_status(session, transcript_id, "error")
|
||||
except:
|
||||
logger.error(
|
||||
"Error setting status in task_pipeline_file_process during exception, ignoring it"
|
||||
)
|
||||
raise
|
||||
|
||||
# Run post-processing chain: consent cleanup -> zulip -> webhook
|
||||
post_chain = chain(
|
||||
task_cleanup_consent.si(transcript_id=transcript_id),
|
||||
task_pipeline_post_to_zulip.si(transcript_id=transcript_id),
|
||||
task_send_webhook_if_needed.si(transcript_id=transcript_id),
|
||||
)
|
||||
post_chain.delay()
|
||||
await task_cleanup_consent_taskiq.kiq(transcript_id=transcript_id)
|
||||
await task_pipeline_post_to_zulip_taskiq.kiq(transcript_id=transcript_id)
|
||||
await task_send_webhook_if_needed.kiq(transcript_id=transcript_id)
|
||||
|
||||
@@ -12,19 +12,16 @@ It is directly linked to our data model.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import functools
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Generic
|
||||
|
||||
import av
|
||||
import boto3
|
||||
from celery import chord, current_task, group, shared_task
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from structlog import BoundLogger as Logger
|
||||
|
||||
from reflector.asynctask import asynctask
|
||||
from reflector.db import get_session_factory
|
||||
from reflector.db import get_session_context
|
||||
from reflector.db.meetings import meeting_consent_controller, meetings_controller
|
||||
from reflector.db.recordings import recordings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
@@ -64,6 +61,7 @@ from reflector.processors.types import (
|
||||
from reflector.processors.types import Transcript as TranscriptProcessorType
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_transcripts_storage
|
||||
from reflector.worker.app import taskiq_broker
|
||||
from reflector.worker.session_decorator import with_session_and_transcript
|
||||
from reflector.ws_manager import WebsocketManager, get_ws_manager
|
||||
from reflector.zulip import (
|
||||
@@ -91,40 +89,6 @@ def broadcast_to_sockets(func):
|
||||
return wrapper
|
||||
|
||||
|
||||
def get_transcript(func):
|
||||
"""
|
||||
Decorator to fetch the transcript from the database from the first argument
|
||||
"""
|
||||
|
||||
@functools.wraps(func)
|
||||
async def wrapper(**kwargs):
|
||||
transcript_id = kwargs.pop("transcript_id")
|
||||
async with get_session_factory()() as session:
|
||||
transcript = await transcripts_controller.get_by_id(session, transcript_id)
|
||||
if not transcript:
|
||||
raise Exception(f"Transcript {transcript_id} not found")
|
||||
|
||||
# Enhanced logger with Celery task context
|
||||
tlogger = logger.bind(transcript_id=transcript.id)
|
||||
if current_task:
|
||||
tlogger = tlogger.bind(
|
||||
task_id=current_task.request.id,
|
||||
task_name=current_task.name,
|
||||
worker_hostname=current_task.request.hostname,
|
||||
task_retries=current_task.request.retries,
|
||||
transcript_id=transcript_id,
|
||||
)
|
||||
|
||||
try:
|
||||
result = await func(transcript=transcript, logger=tlogger, **kwargs)
|
||||
return result
|
||||
except Exception as exc:
|
||||
tlogger.error("Pipeline error", function_name=func.__name__, exc_info=exc)
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class StrValue(BaseModel):
|
||||
value: str
|
||||
|
||||
@@ -175,9 +139,9 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
|
||||
yield
|
||||
|
||||
@asynccontextmanager
|
||||
async def transaction(self):
|
||||
async def locked_session(self):
|
||||
async with self.lock_transaction():
|
||||
async with get_session_factory()() as session:
|
||||
async with get_session_context() as session:
|
||||
yield session
|
||||
|
||||
@broadcast_to_sockets
|
||||
@@ -209,14 +173,14 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
|
||||
|
||||
# when the status of the pipeline changes, update the transcript
|
||||
async with self._lock:
|
||||
async with get_session_factory()() as session:
|
||||
async with get_session_context() as session:
|
||||
return await transcripts_controller.set_status(
|
||||
session, self.transcript_id, status
|
||||
)
|
||||
|
||||
@broadcast_to_sockets
|
||||
async def on_transcript(self, data):
|
||||
async with self.transaction() as session:
|
||||
async with self.locked_session() as session:
|
||||
transcript = await self.get_transcript(session)
|
||||
return await transcripts_controller.append_event(
|
||||
session,
|
||||
@@ -236,7 +200,7 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
|
||||
)
|
||||
if isinstance(data, TitleSummaryWithIdProcessorType):
|
||||
topic.id = data.id
|
||||
async with self.transaction() as session:
|
||||
async with self.locked_session() as session:
|
||||
transcript = await self.get_transcript(session)
|
||||
await transcripts_controller.upsert_topic(session, transcript, topic)
|
||||
return await transcripts_controller.append_event(
|
||||
@@ -249,7 +213,7 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
|
||||
@broadcast_to_sockets
|
||||
async def on_title(self, data):
|
||||
final_title = TranscriptFinalTitle(title=data.title)
|
||||
async with self.transaction() as session:
|
||||
async with self.locked_session() as session:
|
||||
transcript = await self.get_transcript(session)
|
||||
if not transcript.title:
|
||||
await transcripts_controller.update(
|
||||
@@ -269,7 +233,7 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
|
||||
@broadcast_to_sockets
|
||||
async def on_long_summary(self, data):
|
||||
final_long_summary = TranscriptFinalLongSummary(long_summary=data.long_summary)
|
||||
async with self.transaction() as session:
|
||||
async with self.locked_session() as session:
|
||||
transcript = await self.get_transcript(session)
|
||||
await transcripts_controller.update(
|
||||
session,
|
||||
@@ -290,7 +254,7 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
|
||||
final_short_summary = TranscriptFinalShortSummary(
|
||||
short_summary=data.short_summary
|
||||
)
|
||||
async with self.transaction() as session:
|
||||
async with self.locked_session() as session:
|
||||
transcript = await self.get_transcript(session)
|
||||
await transcripts_controller.update(
|
||||
session,
|
||||
@@ -308,7 +272,7 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
|
||||
|
||||
@broadcast_to_sockets
|
||||
async def on_duration(self, data):
|
||||
async with self.transaction() as session:
|
||||
async with self.locked_session() as session:
|
||||
duration = TranscriptDuration(duration=data)
|
||||
|
||||
transcript = await self.get_transcript(session)
|
||||
@@ -325,7 +289,7 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
|
||||
|
||||
@broadcast_to_sockets
|
||||
async def on_waveform(self, data):
|
||||
async with self.transaction() as session:
|
||||
async with self.locked_session() as session:
|
||||
waveform = TranscriptWaveform(waveform=data)
|
||||
|
||||
transcript = await self.get_transcript(session)
|
||||
@@ -344,7 +308,7 @@ class PipelineMainLive(PipelineMainBase):
|
||||
async def create(self) -> Pipeline:
|
||||
# create a context for the whole rtc transaction
|
||||
# add a customised logger to the context
|
||||
async with get_session_factory()() as session:
|
||||
async with get_session_context() as session:
|
||||
transcript = await self.get_transcript(session)
|
||||
|
||||
processors = [
|
||||
@@ -393,7 +357,7 @@ class PipelineMainDiarization(PipelineMainBase[AudioDiarizationInput]):
|
||||
# now let's start the pipeline by pushing information to the
|
||||
# first processor diarization processor
|
||||
# XXX translation is lost when converting our data model to the processor model
|
||||
async with get_session_factory()() as session:
|
||||
async with get_session_context() as session:
|
||||
transcript = await self.get_transcript(session)
|
||||
|
||||
# diarization works only if the file is uploaded to an external storage
|
||||
@@ -427,7 +391,7 @@ class PipelineMainFromTopics(PipelineMainBase[TitleSummaryWithIdProcessorType]):
|
||||
|
||||
async def create(self) -> Pipeline:
|
||||
# get transcript
|
||||
async with get_session_factory()() as session:
|
||||
async with get_session_context() as session:
|
||||
self._transcript = transcript = await self.get_transcript(session)
|
||||
|
||||
# create pipeline
|
||||
@@ -488,8 +452,7 @@ class PipelineMainWaveform(PipelineMainFromTopics):
|
||||
]
|
||||
|
||||
|
||||
@get_transcript
|
||||
async def pipeline_remove_upload(transcript: Transcript, logger: Logger):
|
||||
async def pipeline_remove_upload(session, transcript: Transcript, logger: Logger):
|
||||
# for future changes: note that there's also a consent process happens, beforehand and users may not consent with keeping files. currently, we delete regardless, so it's no need for that
|
||||
logger.info("Starting remove upload")
|
||||
uploads = transcript.data_path.glob("upload.*")
|
||||
@@ -498,16 +461,14 @@ async def pipeline_remove_upload(transcript: Transcript, logger: Logger):
|
||||
logger.info("Remove upload done")
|
||||
|
||||
|
||||
@get_transcript
|
||||
async def pipeline_waveform(transcript: Transcript, logger: Logger):
|
||||
async def pipeline_waveform(session, transcript: Transcript, logger: Logger):
|
||||
logger.info("Starting waveform")
|
||||
runner = PipelineMainWaveform(transcript_id=transcript.id)
|
||||
await runner.run()
|
||||
logger.info("Waveform done")
|
||||
|
||||
|
||||
@get_transcript
|
||||
async def pipeline_convert_to_mp3(transcript: Transcript, logger: Logger):
|
||||
async def pipeline_convert_to_mp3(session, transcript: Transcript, logger: Logger):
|
||||
logger.info("Starting convert to mp3")
|
||||
|
||||
# If the audio wav is not available, just skip
|
||||
@@ -556,24 +517,21 @@ async def pipeline_upload_mp3(session, transcript: Transcript, logger: Logger):
|
||||
logger.info("Upload mp3 done")
|
||||
|
||||
|
||||
@get_transcript
|
||||
async def pipeline_diarization(transcript: Transcript, logger: Logger):
|
||||
async def pipeline_diarization(session, transcript: Transcript, logger: Logger):
|
||||
logger.info("Starting diarization")
|
||||
runner = PipelineMainDiarization(transcript_id=transcript.id)
|
||||
await runner.run()
|
||||
logger.info("Diarization done")
|
||||
|
||||
|
||||
@get_transcript
|
||||
async def pipeline_title(transcript: Transcript, logger: Logger):
|
||||
async def pipeline_title(session, transcript: Transcript, logger: Logger):
|
||||
logger.info("Starting title")
|
||||
runner = PipelineMainTitle(transcript_id=transcript.id)
|
||||
await runner.run()
|
||||
logger.info("Title done")
|
||||
|
||||
|
||||
@get_transcript
|
||||
async def pipeline_summaries(transcript: Transcript, logger: Logger):
|
||||
async def pipeline_summaries(session, transcript: Transcript, logger: Logger):
|
||||
logger.info("Starting summaries")
|
||||
runner = PipelineMainFinalSummaries(transcript_id=transcript.id)
|
||||
await runner.run()
|
||||
@@ -707,26 +665,31 @@ async def pipeline_post_to_zulip(session, transcript: Transcript, logger: Logger
|
||||
# ===================================================================
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def task_pipeline_remove_upload(*, transcript_id: str):
|
||||
await pipeline_remove_upload(transcript_id=transcript_id)
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_remove_upload(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
):
|
||||
await pipeline_remove_upload(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def task_pipeline_waveform(*, transcript_id: str):
|
||||
await pipeline_waveform(transcript_id=transcript_id)
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_waveform(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
):
|
||||
await pipeline_waveform(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def task_pipeline_convert_to_mp3(*, transcript_id: str):
|
||||
await pipeline_convert_to_mp3(transcript_id=transcript_id)
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_convert_to_mp3(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
):
|
||||
await pipeline_convert_to_mp3(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_upload_mp3(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
@@ -734,81 +697,93 @@ async def task_pipeline_upload_mp3(
|
||||
await pipeline_upload_mp3(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def task_pipeline_diarization(*, transcript_id: str):
|
||||
await pipeline_diarization(transcript_id=transcript_id)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def task_pipeline_title(*, transcript_id: str):
|
||||
await pipeline_title(transcript_id=transcript_id)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def task_pipeline_final_summaries(*, transcript_id: str):
|
||||
await pipeline_summaries(transcript_id=transcript_id)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_cleanup_consent(
|
||||
async def task_pipeline_diarization(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
):
|
||||
await pipeline_diarization(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_title(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
):
|
||||
await pipeline_title(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_final_summaries(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
):
|
||||
await pipeline_summaries(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_cleanup_consent(session, *, transcript: Transcript, logger: Logger):
|
||||
await cleanup_consent(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_post_to_zulip(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
session, *, transcript: Transcript, logger: Logger
|
||||
):
|
||||
await pipeline_post_to_zulip(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
def pipeline_post(*, transcript_id: str):
|
||||
"""
|
||||
Run the post pipeline
|
||||
"""
|
||||
chain_mp3_and_diarize = (
|
||||
task_pipeline_waveform.si(transcript_id=transcript_id)
|
||||
| task_pipeline_convert_to_mp3.si(transcript_id=transcript_id)
|
||||
| task_pipeline_upload_mp3.si(transcript_id=transcript_id)
|
||||
| task_pipeline_remove_upload.si(transcript_id=transcript_id)
|
||||
| task_pipeline_diarization.si(transcript_id=transcript_id)
|
||||
| task_cleanup_consent.si(transcript_id=transcript_id)
|
||||
)
|
||||
chain_title_preview = task_pipeline_title.si(transcript_id=transcript_id)
|
||||
chain_final_summaries = task_pipeline_final_summaries.si(
|
||||
transcript_id=transcript_id
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_cleanup_consent_taskiq(
|
||||
session, *, transcript: Transcript, logger: Logger
|
||||
):
|
||||
await cleanup_consent(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_post_to_zulip_taskiq(
|
||||
session, *, transcript: Transcript, logger: Logger
|
||||
):
|
||||
await pipeline_post_to_zulip(session, transcript=transcript, logger=logger)
|
||||
|
||||
|
||||
async def pipeline_post(*, transcript_id: str):
|
||||
await task_pipeline_post_sequential.kiq(transcript_id=transcript_id)
|
||||
|
||||
|
||||
@taskiq_broker.task
|
||||
async def task_pipeline_post_sequential(*, transcript_id: str):
|
||||
await task_pipeline_waveform.kiq(transcript_id=transcript_id)
|
||||
await task_pipeline_convert_to_mp3.kiq(transcript_id=transcript_id)
|
||||
await task_pipeline_upload_mp3.kiq(transcript_id=transcript_id)
|
||||
await task_pipeline_remove_upload.kiq(transcript_id=transcript_id)
|
||||
await task_pipeline_diarization.kiq(transcript_id=transcript_id)
|
||||
await task_cleanup_consent.kiq(transcript_id=transcript_id)
|
||||
|
||||
await asyncio.gather(
|
||||
task_pipeline_title.kiq(transcript_id=transcript_id),
|
||||
task_pipeline_final_summaries.kiq(transcript_id=transcript_id),
|
||||
)
|
||||
|
||||
chain = chord(
|
||||
group(chain_mp3_and_diarize, chain_title_preview),
|
||||
chain_final_summaries,
|
||||
) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id)
|
||||
|
||||
return chain.delay()
|
||||
await task_pipeline_post_to_zulip.kiq(transcript_id=transcript_id)
|
||||
|
||||
|
||||
@get_transcript
|
||||
async def pipeline_process(transcript: Transcript, logger: Logger):
|
||||
async def pipeline_process(session, transcript: Transcript, logger: Logger):
|
||||
try:
|
||||
if transcript.audio_location == "storage":
|
||||
async with get_session_factory()() as session:
|
||||
await transcripts_controller.download_mp3_from_storage(transcript)
|
||||
transcript.audio_waveform_filename.unlink(missing_ok=True)
|
||||
await transcripts_controller.update(
|
||||
session,
|
||||
transcript,
|
||||
{
|
||||
"topics": [],
|
||||
},
|
||||
)
|
||||
await transcripts_controller.download_mp3_from_storage(transcript)
|
||||
transcript.audio_waveform_filename.unlink(missing_ok=True)
|
||||
await transcripts_controller.update(
|
||||
session,
|
||||
transcript,
|
||||
{
|
||||
"topics": [],
|
||||
},
|
||||
)
|
||||
|
||||
# open audio
|
||||
audio_filename = next(transcript.data_path.glob("upload.*"), None)
|
||||
@@ -840,20 +815,21 @@ async def pipeline_process(transcript: Transcript, logger: Logger):
|
||||
|
||||
except Exception as exc:
|
||||
logger.error("Pipeline error", exc_info=exc)
|
||||
async with get_session_factory()() as session:
|
||||
await transcripts_controller.update(
|
||||
session,
|
||||
transcript,
|
||||
{
|
||||
"status": "error",
|
||||
},
|
||||
)
|
||||
await transcripts_controller.update(
|
||||
session,
|
||||
transcript,
|
||||
{
|
||||
"status": "error",
|
||||
},
|
||||
)
|
||||
raise
|
||||
|
||||
logger.info("Pipeline ended")
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def task_pipeline_process(*, transcript_id: str):
|
||||
return await pipeline_process(transcript_id=transcript_id)
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def task_pipeline_process(
|
||||
session, *, transcript: Transcript, logger: Logger, transcript_id: str
|
||||
):
|
||||
return await pipeline_process(session, transcript=transcript, logger=logger)
|
||||
|
||||
@@ -248,15 +248,21 @@ class ICSFetchService:
|
||||
)
|
||||
att_data: AttendeeData = {
|
||||
"email": clean_email,
|
||||
"name": att.params.get("CN")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None,
|
||||
"status": att.params.get("PARTSTAT")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None,
|
||||
"role": att.params.get("ROLE")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None,
|
||||
"name": (
|
||||
att.params.get("CN")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None
|
||||
),
|
||||
"status": (
|
||||
att.params.get("PARTSTAT")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None
|
||||
),
|
||||
"role": (
|
||||
att.params.get("ROLE")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None
|
||||
),
|
||||
}
|
||||
final_attendees.append(att_data)
|
||||
else:
|
||||
@@ -264,9 +270,9 @@ class ICSFetchService:
|
||||
att_data: AttendeeData = {
|
||||
"email": email_str,
|
||||
"name": att.params.get("CN") if hasattr(att, "params") else None,
|
||||
"status": att.params.get("PARTSTAT")
|
||||
if hasattr(att, "params")
|
||||
else None,
|
||||
"status": (
|
||||
att.params.get("PARTSTAT") if hasattr(att, "params") else None
|
||||
),
|
||||
"role": att.params.get("ROLE") if hasattr(att, "params") else None,
|
||||
}
|
||||
final_attendees.append(att_data)
|
||||
@@ -281,9 +287,9 @@ class ICSFetchService:
|
||||
)
|
||||
org_data: AttendeeData = {
|
||||
"email": org_email,
|
||||
"name": organizer.params.get("CN")
|
||||
if hasattr(organizer, "params")
|
||||
else None,
|
||||
"name": (
|
||||
organizer.params.get("CN") if hasattr(organizer, "params") else None
|
||||
),
|
||||
"role": "ORGANIZER",
|
||||
}
|
||||
final_attendees.append(org_data)
|
||||
|
||||
@@ -9,11 +9,10 @@ async def export_db(filename: str) -> None:
|
||||
filename = pathlib.Path(filename).resolve()
|
||||
settings.DATABASE_URL = f"sqlite:///{filename}"
|
||||
|
||||
from reflector.db import get_session_factory
|
||||
from reflector.db import get_session_context
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
|
||||
session_factory = get_session_factory()
|
||||
async with session_factory() as session:
|
||||
async with get_session_context() as session:
|
||||
transcripts = await transcripts_controller.get_all(session)
|
||||
|
||||
def export_transcript(transcript, output_dir):
|
||||
|
||||
@@ -8,11 +8,10 @@ async def export_db(filename: str) -> None:
|
||||
filename = pathlib.Path(filename).resolve()
|
||||
settings.DATABASE_URL = f"sqlite:///{filename}"
|
||||
|
||||
from reflector.db import get_session_factory
|
||||
from reflector.db import get_session_context
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
|
||||
session_factory = get_session_factory()
|
||||
async with session_factory() as session:
|
||||
async with get_session_context() as session:
|
||||
transcripts = await transcripts_controller.get_all(session)
|
||||
|
||||
def export_transcript(transcript):
|
||||
|
||||
@@ -7,13 +7,12 @@ import asyncio
|
||||
import json
|
||||
import shutil
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Literal
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from reflector.db import get_session_factory
|
||||
from reflector.db import get_session_context
|
||||
from reflector.db.transcripts import SourceKind, TranscriptTopic, transcripts_controller
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines.main_file_pipeline import (
|
||||
@@ -140,13 +139,7 @@ async def process_live_pipeline(
|
||||
# assert documented behaviour: after process, the pipeline isn't ended. this is the reason of calling pipeline_post
|
||||
assert pre_final_transcript.status != "ended"
|
||||
|
||||
# at this point, diarization is running but we have no access to it. run diarization in parallel - one will hopefully win after polling
|
||||
result = live_pipeline_post(transcript_id=transcript_id)
|
||||
|
||||
# result.ready() blocks even without await; it mutates result also
|
||||
while not result.ready():
|
||||
print(f"Status: {result.state}")
|
||||
time.sleep(2)
|
||||
await live_pipeline_post(transcript_id=transcript_id)
|
||||
|
||||
|
||||
async def process_file_pipeline(
|
||||
@@ -154,13 +147,7 @@ async def process_file_pipeline(
|
||||
):
|
||||
"""Process audio/video file using the optimized file pipeline"""
|
||||
|
||||
# task_pipeline_file_process is a Celery task, need to use .delay() for async execution
|
||||
result = task_pipeline_file_process.delay(transcript_id=transcript_id)
|
||||
|
||||
# Wait for the Celery task to complete
|
||||
while not result.ready():
|
||||
print(f"File pipeline status: {result.state}", file=sys.stderr)
|
||||
time.sleep(2)
|
||||
await task_pipeline_file_process.kiq(transcript_id=transcript_id)
|
||||
|
||||
logger.info("File pipeline processing complete")
|
||||
|
||||
@@ -172,8 +159,7 @@ async def process(
|
||||
pipeline: Literal["live", "file"],
|
||||
output_path: str = None,
|
||||
):
|
||||
session_factory = get_session_factory()
|
||||
async with session_factory() as session:
|
||||
async with get_session_context() as session:
|
||||
transcript_id = await prepare_entry(
|
||||
session,
|
||||
source_path,
|
||||
|
||||
@@ -1,14 +1,10 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
from reflector.app import celery_app # noqa
|
||||
from reflector.pipelines.main_live_pipeline import task_pipeline_main_post
|
||||
from reflector.pipelines.main_live_pipeline import pipeline_post
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("transcript_id", type=str)
|
||||
parser.add_argument("--delay", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.delay:
|
||||
task_pipeline_main_post.delay(args.transcript_id)
|
||||
else:
|
||||
task_pipeline_main_post(args.transcript_id)
|
||||
asyncio.run(pipeline_post(transcript_id=args.transcript_id))
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from typing import Annotated, Optional
|
||||
|
||||
import celery
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
@@ -36,24 +35,6 @@ async def transcript_process(
|
||||
status_code=400, detail="Recording is not ready for processing"
|
||||
)
|
||||
|
||||
if task_is_scheduled_or_active(
|
||||
"reflector.pipelines.main_file_pipeline.task_pipeline_file_process",
|
||||
transcript_id=transcript_id,
|
||||
):
|
||||
return ProcessStatus(status="already running")
|
||||
|
||||
# schedule a background task process the file
|
||||
task_pipeline_file_process.delay(transcript_id=transcript_id)
|
||||
await task_pipeline_file_process.kiq(transcript_id=transcript_id)
|
||||
|
||||
return ProcessStatus(status="ok")
|
||||
|
||||
|
||||
def task_is_scheduled_or_active(task_name: str, **kwargs):
|
||||
inspect = celery.current_app.control.inspect()
|
||||
|
||||
for worker, tasks in (inspect.scheduled() | inspect.active()).items():
|
||||
for task in tasks:
|
||||
if task["name"] == task_name and task["kwargs"] == kwargs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@@ -94,7 +94,6 @@ async def transcript_record_upload(
|
||||
# set the status to "uploaded"
|
||||
await transcripts_controller.update(session, transcript, {"status": "uploaded"})
|
||||
|
||||
# launch a background task to process the file
|
||||
task_pipeline_file_process.delay(transcript_id=transcript_id)
|
||||
await task_pipeline_file_process.kiq(transcript_id=transcript_id)
|
||||
|
||||
return UploadStatus(status="ok")
|
||||
|
||||
@@ -4,8 +4,10 @@ Transcripts websocket API
|
||||
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
|
||||
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from reflector.db import get_session
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
from reflector.ws_manager import get_ws_manager
|
||||
|
||||
@@ -21,6 +23,7 @@ async def transcript_get_websocket_events(transcript_id: str):
|
||||
async def transcript_events_websocket(
|
||||
transcript_id: str,
|
||||
websocket: WebSocket,
|
||||
session: AsyncSession = Depends(get_session),
|
||||
# user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
# user_id = user["sub"] if user else None
|
||||
|
||||
@@ -1,68 +1,21 @@
|
||||
import celery
|
||||
import os
|
||||
|
||||
import structlog
|
||||
from celery import Celery
|
||||
from celery.schedules import crontab
|
||||
from taskiq import InMemoryBroker
|
||||
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
|
||||
|
||||
from reflector.settings import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
if celery.current_app.main != "default":
|
||||
logger.info(f"Celery already configured ({celery.current_app})")
|
||||
app = celery.current_app
|
||||
|
||||
env = os.environ.get("ENVIRONMENT")
|
||||
if env and env == "pytest":
|
||||
taskiq_broker = InMemoryBroker(await_inplace=True)
|
||||
else:
|
||||
app = Celery(__name__)
|
||||
app.conf.broker_url = settings.CELERY_BROKER_URL
|
||||
app.conf.result_backend = settings.CELERY_RESULT_BACKEND
|
||||
app.conf.broker_connection_retry_on_startup = True
|
||||
app.autodiscover_tasks(
|
||||
[
|
||||
"reflector.pipelines.main_live_pipeline",
|
||||
"reflector.worker.healthcheck",
|
||||
"reflector.worker.process",
|
||||
"reflector.worker.cleanup",
|
||||
"reflector.worker.ics_sync",
|
||||
]
|
||||
result_backend = RedisAsyncResultBackend(
|
||||
redis_url=settings.CELERY_BROKER_URL,
|
||||
result_ex_time=86400,
|
||||
)
|
||||
|
||||
# crontab
|
||||
app.conf.beat_schedule = {
|
||||
"process_messages": {
|
||||
"task": "reflector.worker.process.process_messages",
|
||||
"schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS),
|
||||
},
|
||||
"process_meetings": {
|
||||
"task": "reflector.worker.process.process_meetings",
|
||||
"schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS),
|
||||
},
|
||||
"reprocess_failed_recordings": {
|
||||
"task": "reflector.worker.process.reprocess_failed_recordings",
|
||||
"schedule": crontab(hour=5, minute=0), # Midnight EST
|
||||
},
|
||||
"sync_all_ics_calendars": {
|
||||
"task": "reflector.worker.ics_sync.sync_all_ics_calendars",
|
||||
"schedule": 60.0, # Run every minute to check which rooms need sync
|
||||
},
|
||||
"create_upcoming_meetings": {
|
||||
"task": "reflector.worker.ics_sync.create_upcoming_meetings",
|
||||
"schedule": 30.0, # Run every 30 seconds to create upcoming meetings
|
||||
},
|
||||
}
|
||||
|
||||
if settings.PUBLIC_MODE:
|
||||
app.conf.beat_schedule["cleanup_old_public_data"] = {
|
||||
"task": "reflector.worker.cleanup.cleanup_old_public_data_task",
|
||||
"schedule": crontab(hour=3, minute=0),
|
||||
}
|
||||
logger.info(
|
||||
"Public mode cleanup enabled",
|
||||
retention_days=settings.PUBLIC_DATA_RETENTION_DAYS,
|
||||
)
|
||||
|
||||
if settings.HEALTHCHECK_URL:
|
||||
app.conf.beat_schedule["healthcheck_ping"] = {
|
||||
"task": "reflector.worker.healthcheck.healthcheck_ping",
|
||||
"schedule": 60.0 * 10,
|
||||
}
|
||||
logger.info("Healthcheck enabled", url=settings.HEALTHCHECK_URL)
|
||||
else:
|
||||
logger.warning("Healthcheck disabled, no url configured")
|
||||
taskiq_broker = RedisStreamBroker(
|
||||
url=settings.CELERY_BROKER_URL,
|
||||
).with_result_backend(result_backend)
|
||||
|
||||
@@ -9,16 +9,15 @@ from datetime import datetime, timedelta, timezone
|
||||
from typing import TypedDict
|
||||
|
||||
import structlog
|
||||
from celery import shared_task
|
||||
from pydantic.types import PositiveInt
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from reflector.asynctask import asynctask
|
||||
from reflector.db.base import MeetingModel, RecordingModel, TranscriptModel
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_recordings_storage
|
||||
from reflector.worker.app import taskiq_broker
|
||||
from reflector.worker.session_decorator import with_session
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
@@ -156,11 +155,7 @@ async def cleanup_old_public_data(
|
||||
return stats
|
||||
|
||||
|
||||
@shared_task(
|
||||
autoretry_for=(Exception,),
|
||||
retry_kwargs={"max_retries": 3, "countdown": 300},
|
||||
)
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def cleanup_old_public_data_task(session: AsyncSession, days: int | None = None):
|
||||
await cleanup_old_public_data(session, days=days)
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import httpx
|
||||
import structlog
|
||||
from celery import shared_task
|
||||
|
||||
from reflector.settings import settings
|
||||
from reflector.worker.app import taskiq_broker
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
@shared_task
|
||||
@taskiq_broker.task
|
||||
def healthcheck_ping():
|
||||
url = settings.HEALTHCHECK_URL
|
||||
if not url:
|
||||
|
||||
@@ -1,24 +1,21 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from reflector.asynctask import asynctask
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.services.ics_sync import SyncStatus, ics_sync_service
|
||||
from reflector.whereby import create_meeting, upload_logo
|
||||
from reflector.worker.app import taskiq_broker
|
||||
from reflector.worker.session_decorator import with_session
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def sync_room_ics(session: AsyncSession, room_id: str):
|
||||
try:
|
||||
@@ -56,8 +53,7 @@ async def sync_room_ics(session: AsyncSession, room_id: str):
|
||||
logger.error("Unexpected error during ICS sync", room_id=room_id, error=str(e))
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def sync_all_ics_calendars(session: AsyncSession):
|
||||
try:
|
||||
@@ -71,7 +67,7 @@ async def sync_all_ics_calendars(session: AsyncSession):
|
||||
logger.debug("Skipping room, not time to sync yet", room_id=room.id)
|
||||
continue
|
||||
|
||||
sync_room_ics.delay(room.id)
|
||||
await sync_room_ics.kiq(room.id)
|
||||
|
||||
logger.info("Queued sync tasks for all eligible rooms")
|
||||
|
||||
@@ -151,8 +147,7 @@ async def create_upcoming_meetings_for_event(
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def create_upcoming_meetings(session: AsyncSession):
|
||||
async with RedisAsyncLock("create_upcoming_meetings", skip_if_locked=True) as lock:
|
||||
|
||||
@@ -6,8 +6,6 @@ from urllib.parse import unquote
|
||||
import av
|
||||
import boto3
|
||||
import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from pydantic import ValidationError
|
||||
from redis.exceptions import LockError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
@@ -17,13 +15,13 @@ from reflector.db.recordings import Recording, recordings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import SourceKind, transcripts_controller
|
||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.redis_cache import get_redis_client
|
||||
from reflector.settings import settings
|
||||
from reflector.whereby import get_room_sessions
|
||||
from reflector.worker.app import taskiq_broker
|
||||
from reflector.worker.session_decorator import with_session
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
def parse_datetime_with_timezone(iso_string: str) -> datetime:
|
||||
@@ -34,8 +32,8 @@ def parse_datetime_with_timezone(iso_string: str) -> datetime:
|
||||
return dt
|
||||
|
||||
|
||||
@shared_task
|
||||
def process_messages():
|
||||
@taskiq_broker.task
|
||||
async def process_messages():
|
||||
queue_url = settings.AWS_PROCESS_RECORDING_QUEUE_URL
|
||||
if not queue_url:
|
||||
logger.warning("No process recording queue url")
|
||||
@@ -66,7 +64,7 @@ def process_messages():
|
||||
if record["eventName"].startswith("ObjectCreated"):
|
||||
bucket = record["s3"]["bucket"]["name"]
|
||||
key = unquote(record["s3"]["object"]["key"])
|
||||
process_recording.delay(bucket, key)
|
||||
await process_recording.kiq(bucket, key)
|
||||
|
||||
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
|
||||
logger.info("Processed and deleted message: %s", message)
|
||||
@@ -75,8 +73,7 @@ def process_messages():
|
||||
logger.error("process_messages", error=str(e))
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def process_recording(session: AsyncSession, bucket_name: str, object_key: str):
|
||||
logger.info("Processing recording: %s/%s", bucket_name, object_key)
|
||||
@@ -155,11 +152,10 @@ async def process_recording(session: AsyncSession, bucket_name: str, object_key:
|
||||
|
||||
await transcripts_controller.update(session, transcript, {"status": "uploaded"})
|
||||
|
||||
task_pipeline_file_process.delay(transcript_id=transcript.id)
|
||||
await task_pipeline_file_process.kiq(transcript_id=transcript.id)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def process_meetings(session: AsyncSession):
|
||||
"""
|
||||
@@ -253,8 +249,7 @@ async def process_meetings(session: AsyncSession):
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def reprocess_failed_recordings(session: AsyncSession):
|
||||
"""
|
||||
@@ -291,7 +286,7 @@ async def reprocess_failed_recordings(session: AsyncSession):
|
||||
)
|
||||
if not recording:
|
||||
logger.info(f"Queueing recording for processing: {object_key}")
|
||||
process_recording.delay(bucket_name, object_key)
|
||||
await process_recording.kiq(bucket_name, object_key)
|
||||
reprocessed_count += 1
|
||||
continue
|
||||
|
||||
@@ -310,7 +305,7 @@ async def reprocess_failed_recordings(session: AsyncSession):
|
||||
|
||||
if transcript is None or transcript.status == "error":
|
||||
logger.info(f"Queueing recording for processing: {object_key}")
|
||||
process_recording.delay(bucket_name, object_key)
|
||||
await process_recording.kiq(bucket_name, object_key)
|
||||
reprocessed_count += 1
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -8,9 +8,7 @@ that stays open for the entire duration of the task execution.
|
||||
import functools
|
||||
from typing import Any, Callable, TypeVar
|
||||
|
||||
from celery import current_task
|
||||
|
||||
from reflector.db import get_session_factory
|
||||
from reflector.db import get_session_context
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
from reflector.logger import logger
|
||||
|
||||
@@ -21,12 +19,11 @@ def with_session(func: F) -> F:
|
||||
"""
|
||||
Decorator that provides an AsyncSession as the first argument to the decorated function.
|
||||
|
||||
This should be used AFTER the @asynctask decorator on Celery tasks to ensure
|
||||
proper session management throughout the task execution.
|
||||
This should be used with TaskIQ tasks to ensure proper session management
|
||||
throughout the task execution.
|
||||
|
||||
Example:
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def my_task(session: AsyncSession, arg1: str, arg2: int):
|
||||
# session is automatically provided and managed
|
||||
@@ -36,11 +33,9 @@ def with_session(func: F) -> F:
|
||||
|
||||
@functools.wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
session_factory = get_session_factory()
|
||||
async with session_factory() as session:
|
||||
async with session.begin():
|
||||
# Pass session as first argument to the decorated function
|
||||
return await func(session, *args, **kwargs)
|
||||
async with get_session_context() as session:
|
||||
# Pass session as first argument to the decorated function
|
||||
return await func(session, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
@@ -56,11 +51,10 @@ def with_session_and_transcript(func: F) -> F:
|
||||
4. Creates an enhanced logger with Celery task context
|
||||
5. Passes session, transcript, and logger to the decorated function
|
||||
|
||||
This should be used AFTER the @asynctask decorator on Celery tasks.
|
||||
This should be used with TaskIQ tasks.
|
||||
|
||||
Example:
|
||||
@shared_task
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session_and_transcript
|
||||
async def my_task(session: AsyncSession, transcript: Transcript, logger: Logger, arg1: str):
|
||||
# session, transcript, and logger are automatically provided
|
||||
@@ -76,34 +70,40 @@ def with_session_and_transcript(func: F) -> F:
|
||||
"transcript_id is required for @with_session_and_transcript"
|
||||
)
|
||||
|
||||
session_factory = get_session_factory()
|
||||
async with session_factory() as session:
|
||||
async with session.begin():
|
||||
# Fetch the transcript
|
||||
transcript = await transcripts_controller.get_by_id(
|
||||
session, transcript_id
|
||||
async with get_session_context() as session:
|
||||
transcript = await transcripts_controller.get_by_id(session, transcript_id)
|
||||
if not transcript:
|
||||
raise Exception(f"Transcript {transcript_id} not found")
|
||||
|
||||
tlogger = logger.bind(transcript_id=transcript.id)
|
||||
|
||||
try:
|
||||
return await func(
|
||||
session, transcript=transcript, logger=tlogger, *args, **kwargs
|
||||
)
|
||||
if not transcript:
|
||||
raise Exception(f"Transcript {transcript_id} not found")
|
||||
|
||||
# Create enhanced logger with Celery task context
|
||||
tlogger = logger.bind(transcript_id=transcript.id)
|
||||
if current_task:
|
||||
tlogger = tlogger.bind(
|
||||
task_id=current_task.request.id,
|
||||
task_name=current_task.name,
|
||||
worker_hostname=current_task.request.hostname,
|
||||
task_retries=current_task.request.retries,
|
||||
transcript_id=transcript_id,
|
||||
)
|
||||
|
||||
try:
|
||||
# Pass session, transcript, and logger to the decorated function
|
||||
return await func(
|
||||
session, transcript=transcript, logger=tlogger, *args, **kwargs
|
||||
)
|
||||
except Exception:
|
||||
tlogger.exception("Error in task execution")
|
||||
raise
|
||||
except Exception:
|
||||
tlogger.exception("Error in task execution")
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def catch_exception(func: F) -> F:
|
||||
"""
|
||||
Decorator that catches exceptions and logs them using structlog.
|
||||
"""
|
||||
|
||||
@functools.wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return await func(*args, **kwargs)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Exception caught in function execution",
|
||||
func=func.__name__,
|
||||
args=args,
|
||||
kwargs=kwargs,
|
||||
)
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
76
server/reflector/worker/taskiq_broker.py
Normal file
76
server/reflector/worker/taskiq_broker.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""
|
||||
TaskIQ broker configuration for Reflector.
|
||||
|
||||
This module provides a production-ready TaskIQ broker configuration that handles
|
||||
both test and production environments correctly. It includes retry middleware
|
||||
for 1:1 parity with Celery and proper logging setup.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
import structlog
|
||||
from taskiq import InMemoryBroker
|
||||
from taskiq.middlewares import SimpleRetryMiddleware
|
||||
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
|
||||
|
||||
from reflector.settings import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
def create_taskiq_broker():
|
||||
"""
|
||||
Create and configure the TaskIQ broker based on environment.
|
||||
|
||||
Returns:
|
||||
Configured TaskIQ broker instance with appropriate backend and middleware.
|
||||
"""
|
||||
env = os.environ.get("ENVIRONMENT")
|
||||
|
||||
if env == "pytest":
|
||||
# Test environment: Use InMemoryBroker with immediate execution
|
||||
logger.info("Configuring TaskIQ InMemoryBroker for test environment")
|
||||
broker = InMemoryBroker(await_inplace=True)
|
||||
|
||||
else:
|
||||
# Production environment: Use Redis broker with result backend
|
||||
logger.info(
|
||||
"Configuring TaskIQ RedisStreamBroker for production environment",
|
||||
redis_url=settings.CELERY_BROKER_URL,
|
||||
)
|
||||
|
||||
# Configure Redis result backend
|
||||
result_backend = RedisAsyncResultBackend(
|
||||
redis_url=settings.CELERY_BROKER_URL,
|
||||
result_ex_time=86400, # Results expire after 24 hours
|
||||
)
|
||||
|
||||
# Configure Redis stream broker
|
||||
broker = RedisStreamBroker(
|
||||
url=settings.CELERY_BROKER_URL,
|
||||
stream_name="taskiq:stream", # Custom stream name for clarity
|
||||
consumer_group="taskiq:workers", # Consumer group for load balancing
|
||||
).with_result_backend(result_backend)
|
||||
|
||||
# Add retry middleware for production parity with Celery
|
||||
# This provides automatic retries on task failures
|
||||
retry_middleware = SimpleRetryMiddleware(
|
||||
default_retry_count=3, # Match Celery's default retry behavior
|
||||
)
|
||||
broker.add_middlewares(retry_middleware)
|
||||
|
||||
logger.info(
|
||||
"TaskIQ broker configured successfully",
|
||||
broker_type=type(broker).__name__,
|
||||
has_result_backend=hasattr(broker, "_result_backend"),
|
||||
middleware_count=len(broker.middlewares),
|
||||
)
|
||||
|
||||
return broker
|
||||
|
||||
|
||||
# Create the global broker instance
|
||||
taskiq_broker = create_taskiq_broker()
|
||||
|
||||
# Export the broker for use in task definitions
|
||||
__all__ = ["taskiq_broker"]
|
||||
@@ -8,18 +8,16 @@ from datetime import datetime, timezone
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.webvtt import topics_to_webvtt
|
||||
from reflector.worker.app import taskiq_broker
|
||||
from reflector.worker.session_decorator import with_session
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
|
||||
@@ -33,30 +31,23 @@ def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> s
|
||||
return hmac_obj.hexdigest()
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
max_retries=30,
|
||||
default_retry_delay=60,
|
||||
retry_backoff=True,
|
||||
retry_backoff_max=3600, # Max 1 hour between retries
|
||||
)
|
||||
@asynctask
|
||||
@taskiq_broker.task
|
||||
@with_session
|
||||
async def send_transcript_webhook(
|
||||
self,
|
||||
async def send_transcript_webhook_taskiq(
|
||||
transcript_id: str,
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
session: AsyncSession,
|
||||
):
|
||||
retry_count = 0
|
||||
|
||||
log = logger.bind(
|
||||
transcript_id=transcript_id,
|
||||
room_id=room_id,
|
||||
retry_count=self.request.retries,
|
||||
retry_count=retry_count,
|
||||
)
|
||||
|
||||
try:
|
||||
# Fetch transcript and room
|
||||
transcript = await transcripts_controller.get_by_id(session, transcript_id)
|
||||
if not transcript:
|
||||
log.error("Transcript not found, skipping webhook")
|
||||
@@ -71,11 +62,9 @@ async def send_transcript_webhook(
|
||||
log.info("No webhook URL configured for room, skipping")
|
||||
return
|
||||
|
||||
# Generate WebVTT content from topics
|
||||
topics_data = []
|
||||
|
||||
if transcript.topics:
|
||||
# Build topics data with diarized content per topic
|
||||
for topic in transcript.topics:
|
||||
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
|
||||
topics_data.append(
|
||||
@@ -88,7 +77,6 @@ async def send_transcript_webhook(
|
||||
}
|
||||
)
|
||||
|
||||
# Build webhook payload
|
||||
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
|
||||
participants = [
|
||||
{"id": p.id, "name": p.name, "speaker": p.speaker}
|
||||
@@ -120,16 +108,14 @@ async def send_transcript_webhook(
|
||||
},
|
||||
}
|
||||
|
||||
# Convert to JSON
|
||||
payload_json = json.dumps(payload_data, separators=(",", ":"))
|
||||
payload_bytes = payload_json.encode("utf-8")
|
||||
|
||||
# Generate signature if secret is configured
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": "Reflector-Webhook/1.0",
|
||||
"X-Webhook-Event": "transcript.completed",
|
||||
"X-Webhook-Retry": str(self.request.retries),
|
||||
"X-Webhook-Retry": str(retry_count),
|
||||
}
|
||||
|
||||
if room.webhook_secret:
|
||||
@@ -139,7 +125,6 @@ async def send_transcript_webhook(
|
||||
)
|
||||
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
|
||||
|
||||
# Send webhook with timeout
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
log.info(
|
||||
"Sending webhook",
|
||||
@@ -165,26 +150,22 @@ async def send_transcript_webhook(
|
||||
log.error(
|
||||
"Webhook failed with HTTP error",
|
||||
status_code=e.response.status_code,
|
||||
response_text=e.response.text[:500], # First 500 chars
|
||||
response_text=e.response.text[:500],
|
||||
)
|
||||
|
||||
# Don't retry on client errors (4xx)
|
||||
if 400 <= e.response.status_code < 500:
|
||||
log.error("Client error, not retrying")
|
||||
return
|
||||
|
||||
# Retry on server errors (5xx)
|
||||
raise self.retry(exc=e)
|
||||
raise
|
||||
|
||||
except (httpx.ConnectError, httpx.TimeoutException) as e:
|
||||
# Retry on network errors
|
||||
log.error("Webhook failed with connection error", error=str(e))
|
||||
raise self.retry(exc=e)
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
# Retry on unexpected errors
|
||||
log.exception("Unexpected error in webhook task", error=str(e))
|
||||
raise self.retry(exc=e)
|
||||
raise
|
||||
|
||||
|
||||
async def test_webhook(room_id: str) -> dict:
|
||||
|
||||
86
server/taskiq_migration_plan.md
Normal file
86
server/taskiq_migration_plan.md
Normal file
@@ -0,0 +1,86 @@
|
||||
# TaskIQ Migration Implementation Plan
|
||||
|
||||
## Phase 1: Core Infrastructure Setup
|
||||
|
||||
### 1.1 Create TaskIQ Broker Configuration
|
||||
- [ ] Create `reflector/worker/taskiq_broker.py` with broker setup
|
||||
- [ ] Configure Redis broker with proper connection pooling
|
||||
- [ ] Add retry middleware for 1:1 parity with Celery
|
||||
- [ ] Setup test/production environment detection
|
||||
|
||||
### 1.2 Session Management Utilities
|
||||
- [ ] Create `get_session_context()` function in `reflector/db.py`
|
||||
- [ ] Ensure `@with_session` decorator works with TaskIQ
|
||||
- [ ] Verify test mocking works with new session approach
|
||||
|
||||
## Phase 2: Simple Task Migration (Start Small)
|
||||
|
||||
### 2.1 Migrate Single Tasks First
|
||||
- [ ] `reflector/worker/cleanup.py` - 1 task, simple logic
|
||||
- [ ] `reflector/worker/webhook.py` - 1 task with retry logic
|
||||
- [ ] Test each migrated task individually
|
||||
|
||||
### 2.2 Create Dual-Mode Tasks
|
||||
- [ ] Keep Celery version with `@shared_task`
|
||||
- [ ] Add TaskIQ version without `@asynctask`
|
||||
- [ ] Use feature flag to switch between versions
|
||||
|
||||
## Phase 3: Complex Pipeline Migration
|
||||
|
||||
### 3.1 File Processing Pipeline
|
||||
- [ ] Migrate `task_pipeline_file_process` completely
|
||||
- [ ] Handle all sub-tasks in the pipeline
|
||||
- [ ] Migrate chain/group/chord patterns to TaskIQ
|
||||
|
||||
### 3.2 Live Processing Pipeline
|
||||
- [ ] Migrate all 10 tasks in `main_live_pipeline.py`
|
||||
- [ ] Convert complex chord patterns
|
||||
- [ ] Ensure WebSocket notifications still work
|
||||
|
||||
## Phase 4: Scheduled Tasks Migration
|
||||
|
||||
### 4.1 Convert Celery Beat to TaskIQ Scheduler
|
||||
- [ ] Create `reflector/worker/scheduler.py`
|
||||
- [ ] Migrate all scheduled tasks
|
||||
- [ ] Setup TaskIQ scheduler service
|
||||
|
||||
## Phase 5: Testing Infrastructure
|
||||
|
||||
### 5.1 Update Test Fixtures
|
||||
- [ ] Create TaskIQ test fixtures in `conftest.py`
|
||||
- [ ] Ensure dual-mode testing (both Celery and TaskIQ)
|
||||
- [ ] Verify all existing tests pass
|
||||
|
||||
### 5.2 Migration-Specific Tests
|
||||
- [ ] Test session management across tasks
|
||||
- [ ] Test retry logic parity
|
||||
- [ ] Test scheduled task execution
|
||||
|
||||
## Phase 6: Deployment & Monitoring
|
||||
|
||||
### 6.1 Update Deployment Scripts
|
||||
- [ ] Update Docker configurations
|
||||
- [ ] Create TaskIQ worker startup scripts
|
||||
- [ ] Setup health checks for TaskIQ
|
||||
|
||||
### 6.2 Monitoring Setup
|
||||
- [ ] Create TaskIQ metrics collection
|
||||
- [ ] Setup alerting for failed tasks
|
||||
- [ ] Create migration rollback plan
|
||||
|
||||
## Execution Order
|
||||
|
||||
1. **Week 1**: Phase 1 + Phase 2.1
|
||||
2. **Week 2**: Phase 2.2 + Phase 3.1
|
||||
3. **Week 3**: Phase 3.2 + Phase 4
|
||||
4. **Week 4**: Phase 5
|
||||
5. **Week 5**: Phase 6 + Testing
|
||||
6. **Week 6**: Cutover + Monitoring
|
||||
|
||||
## Success Metrics
|
||||
|
||||
- All tests passing with TaskIQ
|
||||
- No performance degradation
|
||||
- Successful parallel running for 1 week
|
||||
- Zero data loss during migration
|
||||
- Rollback tested and documented
|
||||
@@ -1,7 +1,6 @@
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from tempfile import NamedTemporaryFile
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
@@ -322,26 +321,83 @@ async def dummy_storage():
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_enable_logging():
|
||||
return True
|
||||
# from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
||||
# from sqlalchemy.orm import sessionmaker
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_config():
|
||||
with NamedTemporaryFile() as f:
|
||||
yield {
|
||||
"broker_url": "memory://",
|
||||
"result_backend": f"db+sqlite:///{f.name}",
|
||||
}
|
||||
# @pytest.fixture()
|
||||
# async def db_connection(sqla_engine):
|
||||
# connection = await sqla_engine.connect()
|
||||
# try:
|
||||
# yield connection
|
||||
# finally:
|
||||
# await connection.close()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_includes():
|
||||
return [
|
||||
"reflector.pipelines.main_live_pipeline",
|
||||
"reflector.pipelines.main_file_pipeline",
|
||||
]
|
||||
# @pytest.fixture()
|
||||
# async def db_session_maker(db_connection):
|
||||
# Session = async_sessionmaker(
|
||||
# db_connection,
|
||||
# expire_on_commit=False,
|
||||
# class_=AsyncSession,
|
||||
# )
|
||||
# yield Session
|
||||
|
||||
|
||||
# @pytest.fixture()
|
||||
# async def db_session(db_session_maker, db_connection):
|
||||
# """
|
||||
# Fixture that returns a SQLAlchemy session with a SAVEPOINT, and the rollback to it
|
||||
# after the test completes.
|
||||
# """
|
||||
# session = db_session_maker(
|
||||
# bind=db_connection,
|
||||
# join_transaction_mode="create_savepoint",
|
||||
# )
|
||||
|
||||
# try:
|
||||
# yield session
|
||||
# finally:
|
||||
# await session.close()
|
||||
|
||||
|
||||
# @pytest.fixture(autouse=True)
|
||||
# async def ensure_db_session_in_app(db_connection, db_session_maker):
|
||||
# async def mock_get_session():
|
||||
# session = db_session_maker(
|
||||
# bind=db_connection, join_transaction_mode="create_savepoint"
|
||||
# )
|
||||
|
||||
# try:
|
||||
# yield session
|
||||
# finally:
|
||||
# await session.close()
|
||||
|
||||
# with patch("reflector.db._get_session", side_effect=mock_get_session):
|
||||
# yield
|
||||
|
||||
|
||||
# @pytest.fixture()
|
||||
# async def db_session(sqla_engine):
|
||||
# """
|
||||
# Fixture that returns a SQLAlchemy session with a SAVEPOINT, and the rollback to it
|
||||
# after the test completes.
|
||||
# """
|
||||
# from sqlalchemy.ext.asyncio import AsyncSession
|
||||
# from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# connection = await sqla_engine.connect()
|
||||
# trans = await connection.begin()
|
||||
|
||||
# Session = sessionmaker(connection, expire_on_commit=False, class_=AsyncSession)
|
||||
# session = Session()
|
||||
|
||||
# try:
|
||||
# yield session
|
||||
# finally:
|
||||
# await session.close()
|
||||
# await trans.rollback()
|
||||
# await connection.close()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -372,6 +428,18 @@ def fake_mp3_upload():
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def taskiq_broker():
|
||||
from reflector.worker.app import taskiq_broker
|
||||
|
||||
await taskiq_broker.startup()
|
||||
|
||||
try:
|
||||
yield taskiq_broker
|
||||
finally:
|
||||
await taskiq_broker.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def fake_transcript_with_topics(tmpdir, client, db_session):
|
||||
import shutil
|
||||
|
||||
@@ -130,15 +130,15 @@ async def test_sync_all_ics_calendars(db_session):
|
||||
ics_enabled=False,
|
||||
)
|
||||
|
||||
with patch("reflector.worker.ics_sync.sync_room_ics.delay") as mock_delay:
|
||||
with patch("reflector.worker.ics_sync.sync_room_ics.kiq") as mock_kiq:
|
||||
ics_enabled_rooms = await rooms_controller.get_ics_enabled(db_session)
|
||||
|
||||
for room in ics_enabled_rooms:
|
||||
if room and _should_sync(room):
|
||||
sync_room_ics.delay(room.id)
|
||||
await sync_room_ics.kiq(room.id)
|
||||
|
||||
assert mock_delay.call_count == 2
|
||||
called_room_ids = [call.args[0] for call in mock_delay.call_args_list]
|
||||
assert mock_kiq.call_count == 2
|
||||
called_room_ids = [call.args[0] for call in mock_kiq.call_args_list]
|
||||
assert room1.id in called_room_ids
|
||||
assert room2.id in called_room_ids
|
||||
assert room3.id not in called_room_ids
|
||||
@@ -210,15 +210,15 @@ async def test_sync_respects_fetch_interval(db_session):
|
||||
{"ics_last_sync": now - timedelta(seconds=100)},
|
||||
)
|
||||
|
||||
with patch("reflector.worker.ics_sync.sync_room_ics.delay") as mock_delay:
|
||||
with patch("reflector.worker.ics_sync.sync_room_ics.kiq") as mock_kiq:
|
||||
ics_enabled_rooms = await rooms_controller.get_ics_enabled(db_session)
|
||||
|
||||
for room in ics_enabled_rooms:
|
||||
if room and _should_sync(room):
|
||||
sync_room_ics.delay(room.id)
|
||||
await sync_room_ics.kiq(room.id)
|
||||
|
||||
assert mock_delay.call_count == 1
|
||||
assert mock_delay.call_args[0][0] == room2.id
|
||||
assert mock_kiq.call_count == 1
|
||||
assert mock_kiq.call_args[0][0] == room2.id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -297,6 +297,7 @@ async def mock_summary_processor():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_main_file_process(
|
||||
db_session,
|
||||
tmpdir,
|
||||
mock_transcript_in_db,
|
||||
dummy_file_transcript,
|
||||
@@ -377,7 +378,7 @@ async def test_pipeline_main_file_process(
|
||||
mock_av.side_effect = [mock_container, mock_decode_container]
|
||||
|
||||
# Run the pipeline
|
||||
await pipeline.process(upload_path)
|
||||
await pipeline.process(db_session, upload_path)
|
||||
|
||||
# Verify audio extraction and writing
|
||||
assert mock_audio_file_writer.push.called
|
||||
@@ -422,6 +423,7 @@ async def test_pipeline_main_file_process(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_main_file_with_video(
|
||||
db_session,
|
||||
tmpdir,
|
||||
mock_transcript_in_db,
|
||||
dummy_file_transcript,
|
||||
@@ -468,7 +470,7 @@ async def test_pipeline_main_file_with_video(
|
||||
mock_av.side_effect = [mock_container, mock_decode_container]
|
||||
|
||||
# Run the pipeline
|
||||
await pipeline.process(upload_path)
|
||||
await pipeline.process(db_session, upload_path)
|
||||
|
||||
# Verify audio extraction from video
|
||||
assert mock_audio_file_writer.push.called
|
||||
@@ -486,6 +488,7 @@ async def test_pipeline_main_file_with_video(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_main_file_no_diarization(
|
||||
db_session,
|
||||
tmpdir,
|
||||
mock_transcript_in_db,
|
||||
dummy_file_transcript,
|
||||
@@ -533,7 +536,7 @@ async def test_pipeline_main_file_no_diarization(
|
||||
mock_av.side_effect = [mock_container, mock_decode_container]
|
||||
|
||||
# Run the pipeline
|
||||
await pipeline.process(upload_path)
|
||||
await pipeline.process(db_session, upload_path)
|
||||
|
||||
# Verify the pipeline completed without diarization
|
||||
assert mock_storage._put_file.called
|
||||
@@ -547,6 +550,7 @@ async def test_pipeline_main_file_no_diarization(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_pipeline_file_process(
|
||||
db_session,
|
||||
tmpdir,
|
||||
mock_transcript_in_db,
|
||||
dummy_file_transcript,
|
||||
@@ -593,7 +597,7 @@ async def test_task_pipeline_file_process(
|
||||
from reflector.pipelines.main_file_pipeline import PipelineMainFile
|
||||
|
||||
pipeline = PipelineMainFile(transcript_id=mock_transcript_in_db.id)
|
||||
await pipeline.process(upload_path)
|
||||
await pipeline.process(db_session, upload_path)
|
||||
|
||||
# Verify the pipeline was executed through the task
|
||||
assert mock_audio_file_writer.push.called
|
||||
@@ -633,6 +637,7 @@ async def test_pipeline_file_process_no_transcript():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_file_process_no_audio_file(
|
||||
db_session,
|
||||
mock_transcript_in_db,
|
||||
):
|
||||
"""
|
||||
@@ -650,4 +655,4 @@ async def test_pipeline_file_process_no_audio_file(
|
||||
|
||||
# This should fail when trying to open the file with av
|
||||
with pytest.raises(Exception):
|
||||
await pipeline.process(non_existent_path)
|
||||
await pipeline.process(db_session, non_existent_path)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import asyncio
|
||||
import time
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
# Set environment for TaskIQ to use InMemoryBroker
|
||||
os.environ["ENVIRONMENT"] = "pytest"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def app_lifespan():
|
||||
@@ -23,8 +25,16 @@ async def client(app_lifespan):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("celery_session_app")
|
||||
@pytest.mark.usefixtures("celery_session_worker")
|
||||
@pytest.fixture
|
||||
async def taskiq_broker():
|
||||
from reflector.worker.app import taskiq_broker
|
||||
|
||||
# Broker is already initialized as InMemoryBroker due to ENVIRONMENT=pytest
|
||||
await taskiq_broker.startup()
|
||||
yield taskiq_broker
|
||||
await taskiq_broker.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_transcript_process(
|
||||
tmpdir,
|
||||
@@ -34,7 +44,10 @@ async def test_transcript_process(
|
||||
dummy_file_diarization,
|
||||
dummy_storage,
|
||||
client,
|
||||
taskiq_broker,
|
||||
db_session,
|
||||
):
|
||||
print("IN TEST", db_session)
|
||||
# create a transcript
|
||||
response = await client.post("/transcripts", json={"name": "test"})
|
||||
assert response.status_code == 200
|
||||
@@ -55,18 +68,14 @@ async def test_transcript_process(
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "ok"
|
||||
|
||||
# wait for processing to finish (max 1 minute)
|
||||
timeout_seconds = 60
|
||||
start_time = time.monotonic()
|
||||
while (time.monotonic() - start_time) < timeout_seconds:
|
||||
# fetch the transcript and check if it is ended
|
||||
resp = await client.get(f"/transcripts/{tid}")
|
||||
assert resp.status_code == 200
|
||||
if resp.json()["status"] in ("ended", "error"):
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
pytest.fail(f"Initial processing timed out after {timeout_seconds} seconds")
|
||||
# Wait for all tasks to complete since we're using InMemoryBroker
|
||||
await taskiq_broker.wait_all()
|
||||
|
||||
# Ensure it's finished ok
|
||||
resp = await client.get(f"/transcripts/{tid}")
|
||||
assert resp.status_code == 200
|
||||
print(resp.json())
|
||||
assert resp.json()["status"] in ("ended", "error")
|
||||
|
||||
# restart the processing
|
||||
response = await client.post(
|
||||
@@ -74,20 +83,15 @@ async def test_transcript_process(
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "ok"
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# wait for processing to finish (max 1 minute)
|
||||
timeout_seconds = 60
|
||||
start_time = time.monotonic()
|
||||
while (time.monotonic() - start_time) < timeout_seconds:
|
||||
# fetch the transcript and check if it is ended
|
||||
resp = await client.get(f"/transcripts/{tid}")
|
||||
assert resp.status_code == 200
|
||||
if resp.json()["status"] in ("ended", "error"):
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
pytest.fail(f"Restart processing timed out after {timeout_seconds} seconds")
|
||||
# Wait for all tasks to complete since we're using InMemoryBroker
|
||||
await taskiq_broker.wait_all()
|
||||
|
||||
# Ensure it's finished ok
|
||||
resp = await client.get(f"/transcripts/{tid}")
|
||||
assert resp.status_code == 200
|
||||
print(resp.json())
|
||||
assert resp.json()["status"] in ("ended", "error")
|
||||
|
||||
# check the transcript is ended
|
||||
transcript = resp.json()
|
||||
|
||||
@@ -49,7 +49,7 @@ class ThreadedUvicorn:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def appserver(tmpdir, database, celery_session_app, celery_session_worker):
|
||||
def appserver(tmpdir, database):
|
||||
import threading
|
||||
|
||||
from reflector.app import app
|
||||
@@ -111,8 +111,6 @@ def appserver(tmpdir, database, celery_session_app, celery_session_worker):
|
||||
settings.DATA_DIR = DATA_DIR
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("celery_session_app")
|
||||
@pytest.mark.usefixtures("celery_session_worker")
|
||||
@pytest.mark.asyncio
|
||||
async def test_transcript_rtc_and_websocket(
|
||||
tmpdir,
|
||||
@@ -275,8 +273,6 @@ async def test_transcript_rtc_and_websocket(
|
||||
assert audio_resp.headers["Content-Type"] == "audio/mpeg"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("celery_session_app")
|
||||
@pytest.mark.usefixtures("celery_session_worker")
|
||||
@pytest.mark.asyncio
|
||||
async def test_transcript_rtc_and_websocket_and_fr(
|
||||
tmpdir,
|
||||
|
||||
@@ -4,8 +4,6 @@ import time
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("celery_session_app")
|
||||
@pytest.mark.usefixtures("celery_session_worker")
|
||||
@pytest.mark.asyncio
|
||||
async def test_transcript_upload_file(
|
||||
tmpdir,
|
||||
|
||||
302
server/uv.lock
generated
302
server/uv.lock
generated
@@ -210,18 +210,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c2/62/96b5217b742805236614f05904541000f55422a6060a90d7fd4ce26c172d/alembic-1.16.4-py3-none-any.whl", hash = "sha256:b05e51e8e82efc1abd14ba2af6392897e145930c3e0a2faf2b0da2f7f7fd660d", size = 247026 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "amqp"
|
||||
version = "5.3.1"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "vine" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/79/fc/ec94a357dfc6683d8c86f8b4cfa5416a4c36b28052ec8260c77aca96a443/amqp-5.3.1.tar.gz", hash = "sha256:cddc00c725449522023bad949f70fff7b48f0b1ade74d170a6f10ab044739432", size = 129013 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/26/99/fc813cd978842c26c82534010ea849eee9ab3a13ea2b74e95cb9c99e747b/amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2", size = 50944 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "annotated-types"
|
||||
version = "0.7.0"
|
||||
@@ -375,15 +363,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/50/cd/30110dc0ffcf3b131156077b90e9f60ed75711223f306da4db08eff8403b/beautifulsoup4-4.13.4-py3-none-any.whl", hash = "sha256:9bbbb14bfde9d79f38b8cd5f8c7c85f4b8f2523190ebed90e950a8dea4cb1c4b", size = 187285 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "billiard"
|
||||
version = "4.2.1"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/7c/58/1546c970afcd2a2428b1bfafecf2371d8951cc34b46701bea73f4280989e/billiard-4.2.1.tar.gz", hash = "sha256:12b641b0c539073fc8d3f5b8b7be998956665c4233c7c1fcd66a7e677c4fb36f", size = 155031 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/30/da/43b15f28fe5f9e027b41c539abc5469052e9d48fd75f8ff094ba2a0ae767/billiard-4.2.1-py3-none-any.whl", hash = "sha256:40b59a4ac8806ba2c2369ea98d876bc6108b051c227baffd928c644d15d8f3cb", size = 86766 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "black"
|
||||
version = "24.1.1"
|
||||
@@ -436,25 +415,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/7e/83/a753562020b69fa90cebc39e8af2c753b24dcdc74bee8355ee3f6cefdf34/botocore-1.38.27-py3-none-any.whl", hash = "sha256:a785d5e9a5eda88ad6ab9ed8b87d1f2ac409d0226bba6ff801c55359e94d91a8", size = 13580545 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "celery"
|
||||
version = "5.5.3"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "billiard" },
|
||||
{ name = "click" },
|
||||
{ name = "click-didyoumean" },
|
||||
{ name = "click-plugins" },
|
||||
{ name = "click-repl" },
|
||||
{ name = "kombu" },
|
||||
{ name = "python-dateutil" },
|
||||
{ name = "vine" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/bb/7d/6c289f407d219ba36d8b384b42489ebdd0c84ce9c413875a8aae0c85f35b/celery-5.5.3.tar.gz", hash = "sha256:6c972ae7968c2b5281227f01c3a3f984037d21c5129d07bf3550cc2afc6b10a5", size = 1667144 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c9/af/0dcccc7fdcdf170f9a1585e5e96b6fb0ba1749ef6be8c89a6202284759bd/celery-5.5.3-py3-none-any.whl", hash = "sha256:0b5761a07057acee94694464ca482416b959568904c9dfa41ce8413a7d65d525", size = 438775 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "certifi"
|
||||
version = "2025.7.14"
|
||||
@@ -545,43 +505,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/85/32/10bb5764d90a8eee674e9dc6f4db6a0ab47c8c4d0d83c27f7c39ac415a4d/click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b", size = 102215 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "click-didyoumean"
|
||||
version = "0.3.1"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "click" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/30/ce/217289b77c590ea1e7c24242d9ddd6e249e52c795ff10fac2c50062c48cb/click_didyoumean-0.3.1.tar.gz", hash = "sha256:4f82fdff0dbe64ef8ab2279bd6aa3f6a99c3b28c05aa09cbfc07c9d7fbb5a463", size = 3089 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/1b/5b/974430b5ffdb7a4f1941d13d83c64a0395114503cc357c6b9ae4ce5047ed/click_didyoumean-0.3.1-py3-none-any.whl", hash = "sha256:5c4bb6007cfea5f2fd6583a2fb6701a22a41eb98957e63d0fac41c10e7c3117c", size = 3631 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "click-plugins"
|
||||
version = "1.1.1.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "click" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/c3/a4/34847b59150da33690a36da3681d6bbc2ec14ee9a846bc30a6746e5984e4/click_plugins-1.1.1.2.tar.gz", hash = "sha256:d7af3984a99d243c131aa1a828331e7630f4a88a9741fd05c927b204bcf92261", size = 8343 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/3d/9a/2abecb28ae875e39c8cad711eb1186d8d14eab564705325e77e4e6ab9ae5/click_plugins-1.1.1.2-py2.py3-none-any.whl", hash = "sha256:008d65743833ffc1f5417bf0e78e8d2c23aab04d9745ba817bd3e71b0feb6aa6", size = 11051 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "click-repl"
|
||||
version = "0.3.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "click" },
|
||||
{ name = "prompt-toolkit" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/cb/a2/57f4ac79838cfae6912f997b4d1a64a858fb0c86d7fcaae6f7b58d267fca/click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9", size = 10449 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/52/40/9d857001228658f0d59e97ebd4c346fe73e138c6de1bce61dc568a57c7f8/click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812", size = 10289 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "colorama"
|
||||
version = "0.4.6"
|
||||
@@ -775,23 +698,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c3/be/d0d44e092656fe7a06b55e6103cbce807cdbdee17884a5367c68c9860853/dataclasses_json-0.6.7-py3-none-any.whl", hash = "sha256:0dbf33f26c8d5305befd61b39d2b3414e8a407bedc2834dea9b8d642666fb40a", size = 28686 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "debugpy"
|
||||
version = "1.8.15"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/8c/8b/3a9a28ddb750a76eaec445c7f4d3147ea2c579a97dbd9e25d39001b92b21/debugpy-1.8.15.tar.gz", hash = "sha256:58d7a20b7773ab5ee6bdfb2e6cf622fdf1e40c9d5aef2857d85391526719ac00", size = 1643279 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/d2/b3/1c44a2ed311199ab11c2299c9474a6c7cd80d19278defd333aeb7c287995/debugpy-1.8.15-cp311-cp311-macosx_14_0_universal2.whl", hash = "sha256:babc4fb1962dd6a37e94d611280e3d0d11a1f5e6c72ac9b3d87a08212c4b6dd3", size = 2183442 },
|
||||
{ url = "https://files.pythonhosted.org/packages/f6/69/e2dcb721491e1c294d348681227c9b44fb95218f379aa88e12a19d85528d/debugpy-1.8.15-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f778e68f2986a58479d0ac4f643e0b8c82fdd97c2e200d4d61e7c2d13838eb53", size = 3134215 },
|
||||
{ url = "https://files.pythonhosted.org/packages/17/76/4ce63b95d8294dcf2fd1820860b300a420d077df4e93afcaa25a984c2ca7/debugpy-1.8.15-cp311-cp311-win32.whl", hash = "sha256:f9d1b5abd75cd965e2deabb1a06b0e93a1546f31f9f621d2705e78104377c702", size = 5154037 },
|
||||
{ url = "https://files.pythonhosted.org/packages/c2/a7/e5a7c784465eb9c976d84408873d597dc7ce74a0fc69ed009548a1a94813/debugpy-1.8.15-cp311-cp311-win_amd64.whl", hash = "sha256:62954fb904bec463e2b5a415777f6d1926c97febb08ef1694da0e5d1463c5c3b", size = 5178133 },
|
||||
{ url = "https://files.pythonhosted.org/packages/ab/4a/4508d256e52897f5cdfee6a6d7580974811e911c6d01321df3264508a5ac/debugpy-1.8.15-cp312-cp312-macosx_14_0_universal2.whl", hash = "sha256:3dcc7225cb317469721ab5136cda9ff9c8b6e6fb43e87c9e15d5b108b99d01ba", size = 2511197 },
|
||||
{ url = "https://files.pythonhosted.org/packages/99/8d/7f6ef1097e7fecf26b4ef72338d08e41644a41b7ee958a19f494ffcffc29/debugpy-1.8.15-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:047a493ca93c85ccede1dbbaf4e66816794bdc214213dde41a9a61e42d27f8fc", size = 4229517 },
|
||||
{ url = "https://files.pythonhosted.org/packages/3f/e8/e8c6a9aa33a9c9c6dacbf31747384f6ed2adde4de2e9693c766bdf323aa3/debugpy-1.8.15-cp312-cp312-win32.whl", hash = "sha256:b08e9b0bc260cf324c890626961dad4ffd973f7568fbf57feb3c3a65ab6b6327", size = 5276132 },
|
||||
{ url = "https://files.pythonhosted.org/packages/e9/ad/231050c6177b3476b85fcea01e565dac83607b5233d003ff067e2ee44d8f/debugpy-1.8.15-cp312-cp312-win_amd64.whl", hash = "sha256:e2a4fe357c92334272eb2845fcfcdbec3ef9f22c16cf613c388ac0887aed15fa", size = 5317645 },
|
||||
{ url = "https://files.pythonhosted.org/packages/07/d5/98748d9860e767a1248b5e31ffa7ce8cb7006e97bf8abbf3d891d0a8ba4e/debugpy-1.8.15-py2.py3-none-any.whl", hash = "sha256:bce2e6c5ff4f2e00b98d45e7e01a49c7b489ff6df5f12d881c67d2f1ac635f3d", size = 5282697 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "defusedxml"
|
||||
version = "0.7.1"
|
||||
@@ -840,20 +746,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/68/1b/e0a87d256e40e8c888847551b20a017a6b98139178505dc7ffb96f04e954/dnspython-2.7.0-py3-none-any.whl", hash = "sha256:b4c34b7d10b51bcc3a5071e7b8dee77939f1e878477eeecc965e9835f63c6c86", size = 313632 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "docker"
|
||||
version = "7.1.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "pywin32", marker = "sys_platform == 'win32'" },
|
||||
{ name = "requests" },
|
||||
{ name = "urllib3" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/91/9b/4a2ea29aeba62471211598dac5d96825bb49348fa07e906ea930394a83ce/docker-7.1.0.tar.gz", hash = "sha256:ad8c70e6e3f8926cb8a92619b832b4ea5299e2831c14284663184e200546fa6c", size = 117834 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/e3/26/57c6fb270950d476074c087527a558ccb6f4436657314bfb6cdf484114c4/docker-7.1.0-py3-none-any.whl", hash = "sha256:c96b93b7f0a746f9e77d325bcfb87422a3d8bd4f03136ae8a85b37f1898d5fc0", size = 147774 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "docopt"
|
||||
version = "0.6.2"
|
||||
@@ -1319,6 +1211,18 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/9c/1f/19ebc343cc71a7ffa78f17018535adc5cbdd87afb31d7c34874680148b32/ifaddr-0.2.0-py3-none-any.whl", hash = "sha256:085e0305cfe6f16ab12d72e2024030f5d52674afad6911bb1eee207177b8a748", size = 12314 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "importlib-metadata"
|
||||
version = "8.7.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "zipp" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/76/66/650a33bd90f786193e4de4b3ad86ea60b53c89b669a5c7be931fac31cdb0/importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000", size = 56641 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/20/b0/36bd937216ec521246249be3bf9855081de4c5e06a0c9b4219dbeda50373/importlib_metadata-8.7.0-py3-none-any.whl", hash = "sha256:e5dd1551894c77868a30651cef00984d50e1002d06942a7101d34870c5f02afd", size = 27656 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iniconfig"
|
||||
version = "2.1.0"
|
||||
@@ -1328,6 +1232,15 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "izulu"
|
||||
version = "0.50.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/d0/58/6d6335c78b7ade54d8a6c6dbaa589e5c21b3fd916341d5a16f774c72652a/izulu-0.50.0.tar.gz", hash = "sha256:cc8e252d5e8560c70b95380295008eeb0786f7b745a405a40d3556ab3252d5f5", size = 48558 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/4a/9f/bf9d33546bbb6e5e80ebafe46f90b7d8b4a77410b7b05160b0ca8978c15a/izulu-0.50.0-py3-none-any.whl", hash = "sha256:4e9ae2508844e7c5f62c468a8b9e2deba2f60325ef63f01e65b39fd9a6b3fab4", size = 18095 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jinja2"
|
||||
version = "3.1.6"
|
||||
@@ -1479,21 +1392,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/da/e9/0d4add7873a73e462aeb45c036a2dead2562b825aa46ba326727b3f31016/kiwisolver-1.4.9-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:fb940820c63a9590d31d88b815e7a3aa5915cad3ce735ab45f0c730b39547de1", size = 73929 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kombu"
|
||||
version = "5.5.4"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "amqp" },
|
||||
{ name = "packaging" },
|
||||
{ name = "tzdata" },
|
||||
{ name = "vine" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/0f/d3/5ff936d8319ac86b9c409f1501b07c426e6ad41966fedace9ef1b966e23f/kombu-5.5.4.tar.gz", hash = "sha256:886600168275ebeada93b888e831352fe578168342f0d1d5833d88ba0d847363", size = 461992 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/ef/70/a07dcf4f62598c8ad579df241af55ced65bed76e42e45d3c368a6d82dbc1/kombu-5.5.4-py3-none-any.whl", hash = "sha256:a12ed0557c238897d8e518f1d1fdf84bd1516c5e305af2dacd85c2015115feb8", size = 210034 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "levenshtein"
|
||||
version = "0.27.1"
|
||||
@@ -2305,18 +2203,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/27/72/0824c18f3bc75810f55dacc2dd933f6ec829771180245ae3cc976195dec0/prometheus_fastapi_instrumentator-7.1.0-py3-none-any.whl", hash = "sha256:978130f3c0bb7b8ebcc90d35516a6fe13e02d2eb358c8f83887cdef7020c31e9", size = 19296 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prompt-toolkit"
|
||||
version = "3.0.51"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "wcwidth" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/bb/6e/9d084c929dfe9e3bfe0c6a47e31f78a25c54627d64a66e884a8bf5474f1c/prompt_toolkit-3.0.51.tar.gz", hash = "sha256:931a162e3b27fc90c86f1b48bb1fb2c528c2761475e57c9c06de13311c7b54ed", size = 428940 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/ce/4f/5249960887b1fbe561d9ff265496d170b55a735b76724f10ef19f9e40716/prompt_toolkit-3.0.51-py3-none-any.whl", hash = "sha256:52742911fde84e2d423e2f9a4cf1de7d7ac4e51958f648d9540e0fb8db077b07", size = 387810 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "propcache"
|
||||
version = "0.3.2"
|
||||
@@ -2372,21 +2258,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/f7/af/ab3c51ab7507a7325e98ffe691d9495ee3d3aa5f589afad65ec920d39821/protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e", size = 168724 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "psutil"
|
||||
version = "7.0.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051 },
|
||||
{ url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535 },
|
||||
{ url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004 },
|
||||
{ url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986 },
|
||||
{ url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544 },
|
||||
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053 },
|
||||
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "psycopg2-binary"
|
||||
version = "2.9.10"
|
||||
@@ -2539,6 +2410,15 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/13/a3/a812df4e2dd5696d1f351d58b8fe16a405b234ad2886a0dab9183fb78109/pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc", size = 117552 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pycron"
|
||||
version = "3.2.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/8e/5d/340be12ae4a69c33102dfb6ddc1dc6e53e69b2d504fa26b5d34a472c3057/pycron-3.2.0.tar.gz", hash = "sha256:e125a28aca0295769541a40633f70b602579df48c9cb357c36c28d2628ba2b13", size = 4248 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/f0/76/caf316909f4545e7158e0e1defd8956a1da49f4af04f5d16b18c358dfeac/pycron-3.2.0-py3-none-any.whl", hash = "sha256:6d2349746270bd642b71b9f7187cf13f4d9ee2412b4710396a507b5fe4f60dac", size = 4904 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pydantic"
|
||||
version = "2.11.7"
|
||||
@@ -2782,25 +2662,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c7/9d/bf86eddabf8c6c9cb1ea9a869d6873b46f105a5d292d3a6f7071f5b07935/pytest_asyncio-1.1.0-py3-none-any.whl", hash = "sha256:5fe2d69607b0bd75c656d1211f969cadba035030156745ee09e7d71740e58ecf", size = 15157 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-celery"
|
||||
version = "1.2.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "celery" },
|
||||
{ name = "debugpy" },
|
||||
{ name = "docker" },
|
||||
{ name = "kombu" },
|
||||
{ name = "psutil" },
|
||||
{ name = "pytest-docker-tools" },
|
||||
{ name = "setuptools" },
|
||||
{ name = "tenacity" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/f4/a0/fcc6a3fbd8c06d04d206da27e1062eaee8ddc0cbaf2db72e66aa74f240e8/pytest_celery-1.2.0.tar.gz", hash = "sha256:de605eca1b0134c136910c8ed161cc3996b0c8aaafd29170878a396eed81b5b1", size = 28043 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/0a/f2/b16389f7ceccd96fd887762626c1509cdd39fdc8f7d4cec59cb1e9716df5/pytest_celery-1.2.0-py3-none-any.whl", hash = "sha256:d81d22a3bed21eb180fa2ee2dd701a00cc4f7c3f1d578e99620c887cad331fb6", size = 49040 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-cov"
|
||||
version = "6.2.1"
|
||||
@@ -2828,19 +2689,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c5/c7/e057e0d1de611ce1bbb26cccf07ddf56eb30a6f6a03aa512a09dac356e03/pytest_docker-3.2.3-py3-none-any.whl", hash = "sha256:f973c35e6f2b674c8fc87e8b3354b02c15866a21994c0841a338c240a05de1eb", size = 8585 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-docker-tools"
|
||||
version = "3.1.9"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "docker" },
|
||||
{ name = "pytest" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/a6/2a/4d68472c2bac257c4e7389b5ca3a6b6cd7da88bce012bed321fdd31372ae/pytest_docker_tools-3.1.9.tar.gz", hash = "sha256:1b6a0cb633c20145731313335ef15bcf5571839c06726764e60cbe495324782b", size = 42824 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/57/79/9dae84c244dabebca6a952e098d6ac9d13719b701fc5323ba6d00abc675a/pytest_docker_tools-3.1.9-py2.py3-none-any.whl", hash = "sha256:36f8e88d56d84ea177df68a175673681243dd991d2807fbf551d90f60341bfdb", size = 29268 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-env"
|
||||
version = "1.1.5"
|
||||
@@ -2974,19 +2822,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pywin32"
|
||||
version = "311"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/7c/af/449a6a91e5d6db51420875c54f6aff7c97a86a3b13a0b4f1a5c13b988de3/pywin32-311-cp311-cp311-win32.whl", hash = "sha256:184eb5e436dea364dcd3d2316d577d625c0351bf237c4e9a5fabbcfa5a58b151", size = 8697031 },
|
||||
{ url = "https://files.pythonhosted.org/packages/51/8f/9bb81dd5bb77d22243d33c8397f09377056d5c687aa6d4042bea7fbf8364/pywin32-311-cp311-cp311-win_amd64.whl", hash = "sha256:3ce80b34b22b17ccbd937a6e78e7225d80c52f5ab9940fe0506a1a16f3dab503", size = 9508308 },
|
||||
{ url = "https://files.pythonhosted.org/packages/44/7b/9c2ab54f74a138c491aba1b1cd0795ba61f144c711daea84a88b63dc0f6c/pywin32-311-cp311-cp311-win_arm64.whl", hash = "sha256:a733f1388e1a842abb67ffa8e7aad0e70ac519e09b0f6a784e65a136ec7cefd2", size = 8703930 },
|
||||
{ url = "https://files.pythonhosted.org/packages/e7/ab/01ea1943d4eba0f850c3c61e78e8dd59757ff815ff3ccd0a84de5f541f42/pywin32-311-cp312-cp312-win32.whl", hash = "sha256:750ec6e621af2b948540032557b10a2d43b0cee2ae9758c54154d711cc852d31", size = 8706543 },
|
||||
{ url = "https://files.pythonhosted.org/packages/d1/a8/a0e8d07d4d051ec7502cd58b291ec98dcc0c3fff027caad0470b72cfcc2f/pywin32-311-cp312-cp312-win_amd64.whl", hash = "sha256:b8c095edad5c211ff31c05223658e71bf7116daa0ecf3ad85f3201ea3190d067", size = 9495040 },
|
||||
{ url = "https://files.pythonhosted.org/packages/ba/3a/2ae996277b4b50f17d61f0603efd8253cb2d79cc7ae159468007b586396d/pywin32-311-cp312-cp312-win_arm64.whl", hash = "sha256:e286f46a9a39c4a18b319c28f59b61de793654af2f395c102b4f819e584b5852", size = 8710102 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyyaml"
|
||||
version = "6.0.2"
|
||||
@@ -3094,7 +2929,6 @@ dependencies = [
|
||||
{ name = "alembic" },
|
||||
{ name = "asyncpg" },
|
||||
{ name = "av" },
|
||||
{ name = "celery" },
|
||||
{ name = "fastapi", extra = ["standard"] },
|
||||
{ name = "fastapi-pagination" },
|
||||
{ name = "httpx" },
|
||||
@@ -3118,6 +2952,8 @@ dependencies = [
|
||||
{ name = "sortedcontainers" },
|
||||
{ name = "sqlalchemy" },
|
||||
{ name = "structlog" },
|
||||
{ name = "taskiq" },
|
||||
{ name = "taskiq-redis" },
|
||||
{ name = "transformers" },
|
||||
{ name = "uvicorn", extra = ["standard"] },
|
||||
{ name = "webvtt-py" },
|
||||
@@ -3156,7 +2992,6 @@ tests = [
|
||||
{ name = "pytest" },
|
||||
{ name = "pytest-aiohttp" },
|
||||
{ name = "pytest-asyncio" },
|
||||
{ name = "pytest-celery" },
|
||||
{ name = "pytest-cov" },
|
||||
{ name = "pytest-docker" },
|
||||
{ name = "pytest-httpx" },
|
||||
@@ -3171,7 +3006,6 @@ requires-dist = [
|
||||
{ name = "alembic", specifier = ">=1.11.3" },
|
||||
{ name = "asyncpg", specifier = ">=0.29.0" },
|
||||
{ name = "av", specifier = ">=10.0.0" },
|
||||
{ name = "celery", specifier = ">=5.3.4" },
|
||||
{ name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" },
|
||||
{ name = "fastapi-pagination", specifier = ">=0.12.6" },
|
||||
{ name = "httpx", specifier = ">=0.24.1" },
|
||||
@@ -3195,6 +3029,8 @@ requires-dist = [
|
||||
{ name = "sortedcontainers", specifier = ">=2.4.0" },
|
||||
{ name = "sqlalchemy", specifier = ">=2.0.0" },
|
||||
{ name = "structlog", specifier = ">=23.1.0" },
|
||||
{ name = "taskiq", specifier = ">=0.11.18" },
|
||||
{ name = "taskiq-redis", specifier = ">=1.1.0" },
|
||||
{ name = "transformers", specifier = ">=4.36.2" },
|
||||
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.23.1" },
|
||||
{ name = "webvtt-py", specifier = ">=0.5.0" },
|
||||
@@ -3229,7 +3065,6 @@ tests = [
|
||||
{ name = "pytest", specifier = ">=7.4.0" },
|
||||
{ name = "pytest-aiohttp", specifier = ">=1.0.4" },
|
||||
{ name = "pytest-asyncio", specifier = ">=0.21.1" },
|
||||
{ name = "pytest-celery", specifier = ">=0.0.0" },
|
||||
{ name = "pytest-cov", specifier = ">=4.1.0" },
|
||||
{ name = "pytest-docker", specifier = ">=3.2.3" },
|
||||
{ name = "pytest-httpx", specifier = ">=0.23.1" },
|
||||
@@ -3807,6 +3642,48 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/40/44/4a5f08c96eb108af5cb50b41f76142f0afa346dfa99d5296fe7202a11854/tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f", size = 35252 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "taskiq"
|
||||
version = "0.11.18"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "anyio" },
|
||||
{ name = "importlib-metadata" },
|
||||
{ name = "izulu" },
|
||||
{ name = "packaging" },
|
||||
{ name = "pycron" },
|
||||
{ name = "pydantic" },
|
||||
{ name = "pytz" },
|
||||
{ name = "taskiq-dependencies" },
|
||||
{ name = "typing-extensions" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/91/4d/0d1b3b6c77a45d7a8c685a9c916b2532cca36a26771831949b874f6d15c3/taskiq-0.11.18.tar.gz", hash = "sha256:b83e1b70aee74d0a197d4a4a5ba165b8ba85b12a2b3b7ebfa3c6fdcc9e3128a7", size = 54323 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/4a/d5/46505f57c140d10d4c36f6bd2f2047fb0460e4d5b9b841dc3b93ab8c893d/taskiq-0.11.18-py3-none-any.whl", hash = "sha256:0df58be24e4ef5d19c8ef02581d35d392b0d780d3fe37950e0478022b85ce288", size = 79608 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "taskiq-dependencies"
|
||||
version = "1.5.7"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/47/90/47a627696e53bfdcacabc3e8c05b73bf1424685bcb5f17209cb8b12da1bf/taskiq_dependencies-1.5.7.tar.gz", hash = "sha256:0d3b240872ef152b719153b9526d866d2be978aeeaea6600e878414babc2dcb4", size = 14875 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/99/6d/4a012f2de002c2e93273f5e7d3e3feea02f7fdbb7b75ca2ca1dd10703091/taskiq_dependencies-1.5.7-py3-none-any.whl", hash = "sha256:6fcee5d159bdb035ef915d4d848826169b6f06fe57cc2297a39b62ea3e76036f", size = 13801 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "taskiq-redis"
|
||||
version = "1.1.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "redis" },
|
||||
{ name = "taskiq" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/59/73/ce7dd796950f792c94ea7e59609ff0ab5205fea72bbb60054be6f8757b1f/taskiq_redis-1.1.0.tar.gz", hash = "sha256:c10bac567cce0b83caa22640da54305d6ddb74ad1324e638367e1b2bd46f682d", size = 15770 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/24/f9/0622ce1c5100b008b34cf7719a576491c1a0f22f50f8db090d797d8de6dc/taskiq_redis-1.1.0-py3-none-any.whl", hash = "sha256:516abe3cd703a7d97a5c0979102082e295d6cf2396a43b1c572382798df221cd", size = 20095 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tenacity"
|
||||
version = "9.1.2"
|
||||
@@ -4255,15 +4132,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/13/5d/1f15b252890c968d42b348d1e9b0aa12d5bf3e776704178ec37cceccdb63/vcrpy-7.0.0-py2.py3-none-any.whl", hash = "sha256:55791e26c18daa363435054d8b35bd41a4ac441b6676167635d1b37a71dbe124", size = 42321 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "vine"
|
||||
version = "5.1.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/bd/e4/d07b5f29d283596b9727dd5275ccbceb63c44a1a82aa9e4bfd20426762ac/vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0", size = 48980 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/03/ff/7c0c86c43b3cbb927e0ccc0255cb4057ceba4799cd44ae95174ce8e8b5b2/vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc", size = 9636 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "watchfiles"
|
||||
version = "1.1.0"
|
||||
@@ -4305,15 +4173,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/bd/d3/254cea30f918f489db09d6a8435a7de7047f8cb68584477a515f160541d6/watchfiles-1.1.0-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:923fec6e5461c42bd7e3fd5ec37492c6f3468be0499bc0707b4bbbc16ac21792", size = 454009 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wcwidth"
|
||||
version = "0.2.13"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/6c/63/53559446a878410fc5a5974feb13d31d78d752eb18aeba59c7fef1af7598/wcwidth-0.2.13.tar.gz", hash = "sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5", size = 101301 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859", size = 34166 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "websockets"
|
||||
version = "15.0.1"
|
||||
@@ -4444,3 +4303,12 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/eb/83/5d9092950565481b413b31a23e75dd3418ff0a277d6e0abf3729d4d1ce25/yarl-1.20.1-cp312-cp312-win_amd64.whl", hash = "sha256:48ea7d7f9be0487339828a4de0360d7ce0efc06524a48e1810f945c45b813698", size = 86710 },
|
||||
{ url = "https://files.pythonhosted.org/packages/b4/2d/2345fce04cfd4bee161bf1e7d9cdc702e3e16109021035dbb24db654a622/yarl-1.20.1-py3-none-any.whl", hash = "sha256:83b8eb083fe4683c6115795d9fc1cfaf2cbbefb19b3a1cb68f6527460f483a77", size = 46542 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zipp"
|
||||
version = "3.23.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/e3/02/0f2892c661036d50ede074e376733dca2ae7c6eb617489437771209d4180/zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166", size = 25547 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276 },
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user