mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-05 10:26:48 +00:00
Compare commits
2 Commits
v0.29.0
...
fix/websoc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
454aecf270 | ||
|
|
dee1555807 |
14
CHANGELOG.md
14
CHANGELOG.md
@@ -1,19 +1,5 @@
|
||||
# Changelog
|
||||
|
||||
## [0.29.0](https://github.com/Monadical-SAS/reflector/compare/v0.28.1...v0.29.0) (2026-01-21)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* set hatchet as default for multitracks ([#822](https://github.com/Monadical-SAS/reflector/issues/822)) ([c723752](https://github.com/Monadical-SAS/reflector/commit/c723752b7e15aa48a41ad22856f147a5517d3f46))
|
||||
|
||||
## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* ics non-sync bugfix ([#823](https://github.com/Monadical-SAS/reflector/issues/823)) ([23d2bc2](https://github.com/Monadical-SAS/reflector/commit/23d2bc283d4d02187b250d2055103e0374ee93d6))
|
||||
|
||||
## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20)
|
||||
|
||||
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
"""replace_use_hatchet_with_use_celery
|
||||
|
||||
Revision ID: 80beb1ea3269
|
||||
Revises: bd3a729bb379
|
||||
Create Date: 2026-01-20 16:26:25.555869
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "80beb1ea3269"
|
||||
down_revision: Union[str, None] = "bd3a729bb379"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||
batch_op.add_column(
|
||||
sa.Column(
|
||||
"use_celery",
|
||||
sa.Boolean(),
|
||||
server_default=sa.text("false"),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
batch_op.drop_column("use_hatchet")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||
batch_op.add_column(
|
||||
sa.Column(
|
||||
"use_hatchet",
|
||||
sa.Boolean(),
|
||||
server_default=sa.text("false"),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
batch_op.drop_column("use_celery")
|
||||
@@ -58,7 +58,7 @@ rooms = sqlalchemy.Table(
|
||||
nullable=False,
|
||||
),
|
||||
sqlalchemy.Column(
|
||||
"use_celery",
|
||||
"use_hatchet",
|
||||
sqlalchemy.Boolean,
|
||||
nullable=False,
|
||||
server_default=false(),
|
||||
@@ -97,7 +97,7 @@ class Room(BaseModel):
|
||||
ics_last_sync: datetime | None = None
|
||||
ics_last_etag: str | None = None
|
||||
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
||||
use_celery: bool = False
|
||||
use_hatchet: bool = False
|
||||
skip_consent: bool = False
|
||||
|
||||
|
||||
|
||||
@@ -12,9 +12,14 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
|
||||
def main():
|
||||
if not settings.HATCHET_ENABLED:
|
||||
logger.error("HATCHET_ENABLED is False, not starting CPU workers")
|
||||
return
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -11,6 +11,7 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow
|
||||
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
|
||||
from reflector.hatchet.workflows.track_processing import track_workflow
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
SLOTS = 10
|
||||
WORKER_NAME = "llm-worker-pool"
|
||||
@@ -18,6 +19,10 @@ POOL = "llm-io"
|
||||
|
||||
|
||||
def main():
|
||||
if not settings.HATCHET_ENABLED:
|
||||
logger.error("HATCHET_ENABLED is False, not starting LLM workers")
|
||||
return
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -319,6 +319,21 @@ class ICSSyncService:
|
||||
calendar = self.fetch_service.parse_ics(ics_content)
|
||||
|
||||
content_hash = hashlib.md5(ics_content.encode()).hexdigest()
|
||||
if room.ics_last_etag == content_hash:
|
||||
logger.info("No changes in ICS for room", room_id=room.id)
|
||||
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
||||
events, total_events = self.fetch_service.extract_room_events(
|
||||
calendar, room.name, room_url
|
||||
)
|
||||
return {
|
||||
"status": SyncStatus.UNCHANGED,
|
||||
"hash": content_hash,
|
||||
"events_found": len(events),
|
||||
"total_events": total_events,
|
||||
"events_created": 0,
|
||||
"events_updated": 0,
|
||||
"events_deleted": 0,
|
||||
}
|
||||
|
||||
# Extract matching events
|
||||
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
||||
@@ -356,44 +371,6 @@ class ICSSyncService:
|
||||
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
|
||||
return time_since_sync.total_seconds() >= room.ics_fetch_interval
|
||||
|
||||
def _event_data_changed(self, existing: CalendarEvent, new_data: EventData) -> bool:
|
||||
"""Check if event data has changed by comparing relevant fields.
|
||||
|
||||
IMPORTANT: When adding fields to CalendarEvent/EventData, update this method
|
||||
and the _COMPARED_FIELDS set below for runtime validation.
|
||||
"""
|
||||
# Fields that come from ICS and should trigger updates when changed
|
||||
_COMPARED_FIELDS = {
|
||||
"title",
|
||||
"description",
|
||||
"start_time",
|
||||
"end_time",
|
||||
"location",
|
||||
"attendees",
|
||||
"ics_raw_data",
|
||||
}
|
||||
|
||||
# Runtime exhaustiveness check: ensure we're comparing all EventData fields
|
||||
event_data_fields = set(EventData.__annotations__.keys()) - {"ics_uid"}
|
||||
if event_data_fields != _COMPARED_FIELDS:
|
||||
missing = event_data_fields - _COMPARED_FIELDS
|
||||
extra = _COMPARED_FIELDS - event_data_fields
|
||||
raise RuntimeError(
|
||||
f"_event_data_changed() field mismatch: "
|
||||
f"missing={missing}, extra={extra}. "
|
||||
f"Update the comparison logic when adding/removing fields."
|
||||
)
|
||||
|
||||
return (
|
||||
existing.title != new_data["title"]
|
||||
or existing.description != new_data["description"]
|
||||
or existing.start_time != new_data["start_time"]
|
||||
or existing.end_time != new_data["end_time"]
|
||||
or existing.location != new_data["location"]
|
||||
or existing.attendees != new_data["attendees"]
|
||||
or existing.ics_raw_data != new_data["ics_raw_data"]
|
||||
)
|
||||
|
||||
async def _sync_events_to_database(
|
||||
self, room_id: str, events: list[EventData]
|
||||
) -> SyncStats:
|
||||
@@ -409,14 +386,11 @@ class ICSSyncService:
|
||||
)
|
||||
|
||||
if existing:
|
||||
# Only count as updated if data actually changed
|
||||
if self._event_data_changed(existing, event_data):
|
||||
updated += 1
|
||||
await calendar_events_controller.upsert(calendar_event)
|
||||
else:
|
||||
created += 1
|
||||
await calendar_events_controller.upsert(calendar_event)
|
||||
|
||||
await calendar_events_controller.upsert(calendar_event)
|
||||
current_ics_uids.append(event_data["ics_uid"])
|
||||
|
||||
# Soft delete events that are no longer in calendar
|
||||
|
||||
@@ -23,6 +23,7 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||
from reflector.pipelines.main_multitrack_pipeline import (
|
||||
task_pipeline_multitrack_process,
|
||||
)
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
|
||||
@@ -101,8 +102,8 @@ async def validate_transcript_for_processing(
|
||||
if transcript.locked:
|
||||
return ValidationLocked(detail="Recording is locked")
|
||||
|
||||
# Check if recording is ready for processing
|
||||
if transcript.status == "idle" and not transcript.workflow_run_id:
|
||||
# hatchet is idempotent anyways + if it wasn't dispatched successfully
|
||||
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
|
||||
return ValidationNotReady(detail="Recording is not ready for processing")
|
||||
|
||||
# Check Celery tasks
|
||||
@@ -115,8 +116,7 @@ async def validate_transcript_for_processing(
|
||||
):
|
||||
return ValidationAlreadyScheduled(detail="already running")
|
||||
|
||||
# Check Hatchet workflow status if workflow_run_id exists
|
||||
if transcript.workflow_run_id:
|
||||
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
|
||||
try:
|
||||
status = await HatchetClientManager.get_workflow_run_status(
|
||||
transcript.workflow_run_id
|
||||
@@ -181,16 +181,19 @@ async def dispatch_transcript_processing(
|
||||
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
|
||||
"""
|
||||
if isinstance(config, MultitrackProcessingConfig):
|
||||
use_celery = False
|
||||
# Check if room has use_hatchet=True (overrides env vars)
|
||||
room_forces_hatchet = False
|
||||
if config.room_id:
|
||||
room = await rooms_controller.get_by_id(config.room_id)
|
||||
use_celery = room.use_celery if room else False
|
||||
room_forces_hatchet = room.use_hatchet if room else False
|
||||
|
||||
use_hatchet = not use_celery
|
||||
# Start durable workflow if enabled (Hatchet)
|
||||
# and if room has use_hatchet=True
|
||||
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
|
||||
|
||||
if use_celery:
|
||||
if room_forces_hatchet:
|
||||
logger.info(
|
||||
"Room uses legacy Celery processing",
|
||||
"Room forces Hatchet workflow",
|
||||
room_id=config.room_id,
|
||||
transcript_id=config.transcript_id,
|
||||
)
|
||||
|
||||
@@ -158,10 +158,19 @@ class Settings(BaseSettings):
|
||||
ZULIP_API_KEY: str | None = None
|
||||
ZULIP_BOT_EMAIL: str | None = None
|
||||
|
||||
# Hatchet workflow orchestration (always enabled for multitrack processing)
|
||||
# Durable workflow orchestration
|
||||
# Provider: "hatchet" (or "none" to disable)
|
||||
DURABLE_WORKFLOW_PROVIDER: str = "none"
|
||||
|
||||
# Hatchet workflow orchestration
|
||||
HATCHET_CLIENT_TOKEN: str | None = None
|
||||
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
|
||||
HATCHET_DEBUG: bool = False
|
||||
|
||||
@property
|
||||
def HATCHET_ENABLED(self) -> bool:
|
||||
"""True if Hatchet is the active provider."""
|
||||
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
@@ -287,12 +287,11 @@ async def _process_multitrack_recording_inner(
|
||||
room_id=room.id,
|
||||
)
|
||||
|
||||
use_celery = room and room.use_celery
|
||||
use_hatchet = not use_celery
|
||||
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
|
||||
|
||||
if use_celery:
|
||||
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
|
||||
logger.info(
|
||||
"Room uses legacy Celery processing",
|
||||
"Room forces Hatchet workflow",
|
||||
room_id=room.id,
|
||||
transcript_id=transcript.id,
|
||||
)
|
||||
@@ -811,6 +810,7 @@ async def reprocess_failed_daily_recordings():
|
||||
)
|
||||
continue
|
||||
|
||||
# Fetch room to check use_hatchet flag
|
||||
room = None
|
||||
if meeting.room_id:
|
||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||
@@ -834,10 +834,10 @@ async def reprocess_failed_daily_recordings():
|
||||
)
|
||||
continue
|
||||
|
||||
use_celery = room and room.use_celery
|
||||
use_hatchet = not use_celery
|
||||
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
|
||||
|
||||
if use_hatchet:
|
||||
# Hatchet requires a transcript for workflow_run_id tracking
|
||||
if not transcript:
|
||||
logger.warning(
|
||||
"No transcript for Hatchet reprocessing, skipping",
|
||||
|
||||
@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import threading
|
||||
|
||||
import redis.asyncio as redis
|
||||
from fastapi import WebSocket
|
||||
@@ -98,8 +97,10 @@ class WebsocketManager:
|
||||
|
||||
async def _pubsub_data_reader(self, pubsub_subscriber):
|
||||
while True:
|
||||
# timeout=1.0 prevents tight CPU loop when no messages available
|
||||
message = await pubsub_subscriber.get_message(
|
||||
ignore_subscribe_messages=True
|
||||
ignore_subscribe_messages=True,
|
||||
timeout=1.0,
|
||||
)
|
||||
if message is not None:
|
||||
room_id = message["channel"].decode("utf-8")
|
||||
@@ -109,29 +110,38 @@ class WebsocketManager:
|
||||
await socket.send_json(data)
|
||||
|
||||
|
||||
# Process-global singleton to ensure only one WebsocketManager instance exists.
|
||||
# Multiple instances would cause resource leaks and CPU issues.
|
||||
_ws_manager: WebsocketManager | None = None
|
||||
|
||||
|
||||
def get_ws_manager() -> WebsocketManager:
|
||||
"""
|
||||
Returns the WebsocketManager instance for managing websockets.
|
||||
Returns the global WebsocketManager singleton.
|
||||
|
||||
This function initializes and returns the WebsocketManager instance,
|
||||
which is responsible for managing websockets and handling websocket
|
||||
connections.
|
||||
Creates instance on first call, subsequent calls return cached instance.
|
||||
Thread-safe via GIL. Concurrent initialization may create duplicate
|
||||
instances but last write wins (acceptable for this use case).
|
||||
|
||||
Returns:
|
||||
WebsocketManager: The initialized WebsocketManager instance.
|
||||
|
||||
Raises:
|
||||
ImportError: If the 'reflector.settings' module cannot be imported.
|
||||
RedisConnectionError: If there is an error connecting to the Redis server.
|
||||
WebsocketManager: The global WebsocketManager instance.
|
||||
"""
|
||||
local = threading.local()
|
||||
if hasattr(local, "ws_manager"):
|
||||
return local.ws_manager
|
||||
global _ws_manager
|
||||
|
||||
if _ws_manager is not None:
|
||||
return _ws_manager
|
||||
|
||||
# No lock needed - GIL makes this safe enough
|
||||
# Worst case: race creates two instances, last assignment wins
|
||||
pubsub_client = RedisPubSubManager(
|
||||
host=settings.REDIS_HOST,
|
||||
port=settings.REDIS_PORT,
|
||||
)
|
||||
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||
local.ws_manager = ws_manager
|
||||
return ws_manager
|
||||
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||
return _ws_manager
|
||||
|
||||
|
||||
def reset_ws_manager() -> None:
|
||||
"""Reset singleton for testing. DO NOT use in production."""
|
||||
global _ws_manager
|
||||
_ws_manager = None
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from tempfile import NamedTemporaryFile
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
@@ -333,10 +332,17 @@ def celery_enable_logging():
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_config():
|
||||
with NamedTemporaryFile() as f:
|
||||
# Use Redis for chord/group task execution (memory:// broker doesn't support chords)
|
||||
# Redis must be running - start with: docker compose up -d redis
|
||||
import os
|
||||
|
||||
redis_host = os.environ.get("REDIS_HOST", "localhost")
|
||||
redis_port = os.environ.get("REDIS_PORT", "6379")
|
||||
# Use db 2 to avoid conflicts with main app
|
||||
redis_url = f"redis://{redis_host}:{redis_port}/2"
|
||||
yield {
|
||||
"broker_url": "memory://",
|
||||
"result_backend": f"db+sqlite:///{f.name}",
|
||||
"broker_url": redis_url,
|
||||
"result_backend": redis_url,
|
||||
}
|
||||
|
||||
|
||||
@@ -370,9 +376,12 @@ async def ws_manager_in_memory(monkeypatch):
|
||||
def __init__(self, queue: asyncio.Queue):
|
||||
self.queue = queue
|
||||
|
||||
async def get_message(self, ignore_subscribe_messages: bool = True):
|
||||
async def get_message(
|
||||
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
|
||||
):
|
||||
wait_timeout = timeout if timeout is not None else 0.05
|
||||
try:
|
||||
return await asyncio.wait_for(self.queue.get(), timeout=0.05)
|
||||
return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
@@ -2,9 +2,10 @@
|
||||
Tests for Hatchet workflow dispatch and routing logic.
|
||||
|
||||
These tests verify:
|
||||
1. Hatchet workflow validation and replay logic
|
||||
2. Force flag to cancel and restart workflows
|
||||
3. Validation prevents concurrent workflows
|
||||
1. Routing to Hatchet when HATCHET_ENABLED=True
|
||||
2. Replay logic for failed workflows
|
||||
3. Force flag to cancel and restart
|
||||
4. Validation prevents concurrent workflows
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
@@ -33,6 +34,9 @@ async def test_hatchet_validation_blocks_running_workflow():
|
||||
workflow_run_id="running-workflow-123",
|
||||
)
|
||||
|
||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
||||
mock_settings.HATCHET_ENABLED = True
|
||||
|
||||
with patch(
|
||||
"reflector.services.transcript_process.HatchetClientManager"
|
||||
) as mock_hatchet:
|
||||
@@ -68,6 +72,9 @@ async def test_hatchet_validation_blocks_queued_workflow():
|
||||
workflow_run_id="queued-workflow-123",
|
||||
)
|
||||
|
||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
||||
mock_settings.HATCHET_ENABLED = True
|
||||
|
||||
with patch(
|
||||
"reflector.services.transcript_process.HatchetClientManager"
|
||||
) as mock_hatchet:
|
||||
@@ -103,6 +110,9 @@ async def test_hatchet_validation_allows_failed_workflow():
|
||||
recording_id="test-recording-id",
|
||||
)
|
||||
|
||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
||||
mock_settings.HATCHET_ENABLED = True
|
||||
|
||||
with patch(
|
||||
"reflector.services.transcript_process.HatchetClientManager"
|
||||
) as mock_hatchet:
|
||||
@@ -139,6 +149,9 @@ async def test_hatchet_validation_allows_completed_workflow():
|
||||
recording_id="test-recording-id",
|
||||
)
|
||||
|
||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
||||
mock_settings.HATCHET_ENABLED = True
|
||||
|
||||
with patch(
|
||||
"reflector.services.transcript_process.HatchetClientManager"
|
||||
) as mock_hatchet:
|
||||
@@ -174,6 +187,9 @@ async def test_hatchet_validation_allows_when_status_check_fails():
|
||||
recording_id="test-recording-id",
|
||||
)
|
||||
|
||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
||||
mock_settings.HATCHET_ENABLED = True
|
||||
|
||||
with patch(
|
||||
"reflector.services.transcript_process.HatchetClientManager"
|
||||
) as mock_hatchet:
|
||||
@@ -211,6 +227,9 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
|
||||
recording_id="test-recording-id",
|
||||
)
|
||||
|
||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
||||
mock_settings.HATCHET_ENABLED = True
|
||||
|
||||
with patch(
|
||||
"reflector.services.transcript_process.HatchetClientManager"
|
||||
) as mock_hatchet:
|
||||
@@ -229,6 +248,38 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
|
||||
assert isinstance(result, ValidationOk)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_database")
|
||||
@pytest.mark.asyncio
|
||||
async def test_hatchet_validation_skipped_when_disabled():
|
||||
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
|
||||
from reflector.services.transcript_process import (
|
||||
ValidationOk,
|
||||
validate_transcript_for_processing,
|
||||
)
|
||||
|
||||
mock_transcript = Transcript(
|
||||
id="test-transcript-id",
|
||||
name="Test",
|
||||
status="uploaded",
|
||||
source_kind="room",
|
||||
workflow_run_id="some-workflow-123",
|
||||
recording_id="test-recording-id",
|
||||
)
|
||||
|
||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
||||
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
|
||||
|
||||
with patch(
|
||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
||||
) as mock_celery_check:
|
||||
mock_celery_check.return_value = False
|
||||
|
||||
result = await validate_transcript_for_processing(mock_transcript)
|
||||
|
||||
# Should not check Hatchet at all
|
||||
assert isinstance(result, ValidationOk)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_database")
|
||||
@pytest.mark.asyncio
|
||||
async def test_validation_locked_transcript():
|
||||
|
||||
@@ -189,17 +189,14 @@ async def test_ics_sync_service_sync_room_calendar():
|
||||
assert events[0].ics_uid == "sync-event-1"
|
||||
assert events[0].title == "Sync Test Meeting"
|
||||
|
||||
# Second sync with same content (calendar unchanged, but sync always runs)
|
||||
# Second sync with same content (should be unchanged)
|
||||
# Refresh room to get updated etag and force sync by setting old sync time
|
||||
room = await rooms_controller.get_by_id(room.id)
|
||||
await rooms_controller.update(
|
||||
room, {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)}
|
||||
)
|
||||
result = await sync_service.sync_room_calendar(room)
|
||||
assert result["status"] == "success"
|
||||
assert result["events_created"] == 0
|
||||
assert result["events_updated"] == 0
|
||||
assert result["events_deleted"] == 0
|
||||
assert result["status"] == "unchanged"
|
||||
|
||||
# Third sync with updated event
|
||||
event["summary"] = "Updated Meeting Title"
|
||||
@@ -291,43 +288,3 @@ async def test_ics_sync_service_error_handling():
|
||||
result = await sync_service.sync_room_calendar(room)
|
||||
assert result["status"] == "error"
|
||||
assert "Network error" in result["error"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_data_changed_exhaustiveness():
|
||||
"""Test that _event_data_changed compares all EventData fields (except ics_uid).
|
||||
|
||||
This test ensures programmers don't forget to update the comparison logic
|
||||
when adding new fields to EventData/CalendarEvent.
|
||||
"""
|
||||
from reflector.services.ics_sync import EventData
|
||||
|
||||
sync_service = ICSSyncService()
|
||||
|
||||
from reflector.db.calendar_events import CalendarEvent
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
event_data: EventData = {
|
||||
"ics_uid": "test-123",
|
||||
"title": "Test",
|
||||
"description": "Desc",
|
||||
"location": "Loc",
|
||||
"start_time": now,
|
||||
"end_time": now + timedelta(hours=1),
|
||||
"attendees": [],
|
||||
"ics_raw_data": "raw",
|
||||
}
|
||||
|
||||
existing = CalendarEvent(
|
||||
room_id="room1",
|
||||
**event_data,
|
||||
)
|
||||
|
||||
# Will raise RuntimeError if fields are missing from comparison
|
||||
result = sync_service._event_data_changed(existing, event_data)
|
||||
assert result is False
|
||||
|
||||
modified_data = event_data.copy()
|
||||
modified_data["title"] = "Changed Title"
|
||||
result = sync_service._event_data_changed(existing, modified_data)
|
||||
assert result is True
|
||||
|
||||
@@ -162,24 +162,9 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from reflector.db.recordings import Recording, recordings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
|
||||
room = await rooms_controller.add(
|
||||
name="test-room",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
)
|
||||
# Force Celery backend for test
|
||||
await rooms_controller.update(room, {"use_celery": True})
|
||||
|
||||
# Create transcript with Daily.co multitrack recording
|
||||
transcript = await transcripts_controller.add(
|
||||
"",
|
||||
source_kind="room",
|
||||
@@ -187,7 +172,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
||||
target_language="en",
|
||||
user_id="test-user",
|
||||
share_mode="public",
|
||||
room_id=room.id,
|
||||
)
|
||||
|
||||
track_keys = [
|
||||
|
||||
@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
|
||||
settings.DATA_DIR = DATA_DIR
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_includes():
|
||||
return ["reflector.pipelines.main_live_pipeline"]
|
||||
# Using celery_includes from conftest.py which includes both pipelines
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_database")
|
||||
|
||||
@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
|
||||
|
||||
if server_instance:
|
||||
server_instance.should_exit = True
|
||||
server_thread.join(timeout=30)
|
||||
server_thread.join(timeout=2.0)
|
||||
|
||||
# Reset global singleton for test isolation
|
||||
from reflector.ws_manager import reset_ws_manager
|
||||
|
||||
reset_ws_manager()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -133,6 +138,11 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
||||
|
||||
# Connect and then trigger an event via HTTP create
|
||||
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
|
||||
# Give Redis pubsub time to establish subscription before publishing
|
||||
import asyncio
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Emit an event to the user's room via a standard HTTP action
|
||||
from httpx import AsyncClient
|
||||
|
||||
@@ -150,6 +160,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
||||
"email": "user-abc@example.com",
|
||||
}
|
||||
|
||||
# Use in-memory client (global singleton makes it share ws_manager)
|
||||
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
|
||||
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
|
||||
resp = await ac.post("/transcripts", json={"name": "WS Test"})
|
||||
|
||||
20
server/uv.lock
generated
20
server/uv.lock
generated
@@ -330,26 +330,6 @@ name = "av"
|
||||
version = "14.4.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/18/8a/d57418b686ffd05fabd5a0a9cfa97e63b38c35d7101af00e87c51c8cc43c/av-14.4.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b21d5586a88b9fce0ab78e26bd1c38f8642f8e2aad5b35e619f4d202217c701", size = 19965048 },
|
||||
{ url = "https://files.pythonhosted.org/packages/f5/aa/3f878b0301efe587e9b07bb773dd6b47ef44ca09a3cffb4af50c08a170f3/av-14.4.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:cf8762d90b0f94a20c9f6e25a94f1757db5a256707964dfd0b1d4403e7a16835", size = 23750064 },
|
||||
{ url = "https://files.pythonhosted.org/packages/9a/b4/6fe94a31f9ed3a927daa72df67c7151968587106f30f9f8fcd792b186633/av-14.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c0ac9f08920c7bbe0795319689d901e27cb3d7870b9a0acae3f26fc9daa801a6", size = 33648775 },
|
||||
{ url = "https://files.pythonhosted.org/packages/6c/f3/7f3130753521d779450c935aec3f4beefc8d4645471159f27b54e896470c/av-14.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a56d9ad2afdb638ec0404e962dc570960aae7e08ae331ad7ff70fbe99a6cf40e", size = 32216915 },
|
||||
{ url = "https://files.pythonhosted.org/packages/f8/9a/8ffabfcafb42154b4b3a67d63f9b69e68fa8c34cb39ddd5cb813dd049ed4/av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bed513cbcb3437d0ae47743edc1f5b4a113c0b66cdd4e1aafc533abf5b2fbf2", size = 35287279 },
|
||||
{ url = "https://files.pythonhosted.org/packages/ad/11/7023ba0a2ca94a57aedf3114ab8cfcecb0819b50c30982a4c5be4d31df41/av-14.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d030c2d3647931e53d51f2f6e0fcf465263e7acf9ec6e4faa8dbfc77975318c3", size = 36294683 },
|
||||
{ url = "https://files.pythonhosted.org/packages/3d/fa/b8ac9636bd5034e2b899354468bef9f4dadb067420a16d8a493a514b7817/av-14.4.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1cc21582a4f606271d8c2036ec7a6247df0831050306c55cf8a905701d0f0474", size = 34552391 },
|
||||
{ url = "https://files.pythonhosted.org/packages/fb/29/0db48079c207d1cba7a2783896db5aec3816e17de55942262c244dffbc0f/av-14.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce7c9cd452153d36f1b1478f904ed5f9ab191d76db873bdd3a597193290805d4", size = 37265250 },
|
||||
{ url = "https://files.pythonhosted.org/packages/1c/55/715858c3feb7efa4d667ce83a829c8e6ee3862e297fb2b568da3f968639d/av-14.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd261e31cc6b43ca722f80656c39934199d8f2eb391e0147e704b6226acebc29", size = 27925845 },
|
||||
{ url = "https://files.pythonhosted.org/packages/a6/75/b8641653780336c90ba89e5352cac0afa6256a86a150c7703c0b38851c6d/av-14.4.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:a53e682b239dd23b4e3bc9568cfb1168fc629ab01925fdb2e7556eb426339e94", size = 19954125 },
|
||||
{ url = "https://files.pythonhosted.org/packages/99/e6/37fe6fa5853a48d54d749526365780a63a4bc530be6abf2115e3a21e292a/av-14.4.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5aa0b901751a32703fa938d2155d56ce3faf3630e4a48d238b35d2f7e49e5395", size = 23751479 },
|
||||
{ url = "https://files.pythonhosted.org/packages/f7/75/9a5f0e6bda5f513b62bafd1cff2b495441a8b07ab7fb7b8e62f0c0d1683f/av-14.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b316fed3597675fe2aacfed34e25fc9d5bb0196dc8c0b014ae5ed4adda48de", size = 33801401 },
|
||||
{ url = "https://files.pythonhosted.org/packages/6a/c9/e4df32a2ad1cb7f3a112d0ed610c5e43c89da80b63c60d60e3dc23793ec0/av-14.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a587b5c5014c3c0e16143a0f8d99874e46b5d0c50db6111aa0b54206b5687c81", size = 32364330 },
|
||||
{ url = "https://files.pythonhosted.org/packages/ca/f0/64e7444a41817fde49a07d0239c033f7e9280bec4a4bb4784f5c79af95e6/av-14.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d53f75e8ac1ec8877a551c0db32a83c0aaeae719d05285281eaaba211bbc30", size = 35519508 },
|
||||
{ url = "https://files.pythonhosted.org/packages/c2/a8/a370099daa9033a3b6f9b9bd815304b3d8396907a14d09845f27467ba138/av-14.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c8558cfde79dd8fc92d97c70e0f0fa8c94c7a66f68ae73afdf58598f0fe5e10d", size = 36448593 },
|
||||
{ url = "https://files.pythonhosted.org/packages/27/bb/edb6ceff8fa7259cb6330c51dbfbc98dd1912bd6eb5f7bc05a4bb14a9d6e/av-14.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:455b6410dea0ab2d30234ffb28df7d62ca3cdf10708528e247bec3a4cdcced09", size = 34701485 },
|
||||
{ url = "https://files.pythonhosted.org/packages/a7/8a/957da1f581aa1faa9a5dfa8b47ca955edb47f2b76b949950933b457bfa1d/av-14.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1661efbe9d975f927b8512d654704223d936f39016fad2ddab00aee7c40f412c", size = 37521981 },
|
||||
{ url = "https://files.pythonhosted.org/packages/28/76/3f1cf0568592f100fd68eb40ed8c491ce95ca3c1378cc2d4c1f6d1bd295d/av-14.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:fbbeef1f421a3461086853d6464ad5526b56ffe8ccb0ab3fd0a1f121dfbf26ad", size = 27925944 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "banks"
|
||||
|
||||
Reference in New Issue
Block a user