- Add email service for sending meeting invites with ICS attachments - Add scheduler for background calendar sync jobs - Add Zulip service for meeting notifications - Make ics_url optional for participants - Add /api/schedule endpoint with 2-hour lead time validation - Update frontend to support scheduling flow Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
106 lines
3.4 KiB
Python
106 lines
3.4 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models import BusyBlock, Participant
|
|
|
|
|
|
def get_week_boundaries(reference_date: datetime | None = None) -> tuple[datetime, datetime]:
|
|
if reference_date is None:
|
|
reference_date = datetime.now(timezone.utc)
|
|
|
|
days_since_monday = reference_date.weekday()
|
|
monday = reference_date - timedelta(days=days_since_monday)
|
|
monday = monday.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
friday = monday + timedelta(days=4)
|
|
friday = friday.replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
|
|
return monday, friday
|
|
|
|
|
|
async def get_busy_blocks_for_participants(
|
|
db: AsyncSession,
|
|
participant_ids: list[UUID],
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
) -> dict[UUID, list[tuple[datetime, datetime]]]:
|
|
stmt = select(BusyBlock).where(
|
|
BusyBlock.participant_id.in_(participant_ids),
|
|
BusyBlock.start_time < end_time,
|
|
BusyBlock.end_time > start_time,
|
|
)
|
|
result = await db.execute(stmt)
|
|
blocks = result.scalars().all()
|
|
|
|
busy_map: dict[UUID, list[tuple[datetime, datetime]]] = {
|
|
pid: [] for pid in participant_ids
|
|
}
|
|
for block in blocks:
|
|
busy_map[block.participant_id].append((block.start_time, block.end_time))
|
|
|
|
return busy_map
|
|
|
|
|
|
def is_participant_free(
|
|
busy_blocks: list[tuple[datetime, datetime]],
|
|
slot_start: datetime,
|
|
slot_end: datetime,
|
|
) -> bool:
|
|
for block_start, block_end in busy_blocks:
|
|
if block_start < slot_end and block_end > slot_start:
|
|
return False
|
|
return True
|
|
|
|
|
|
async def calculate_availability(
|
|
db: AsyncSession,
|
|
participant_ids: list[UUID],
|
|
reference_date: datetime | None = None,
|
|
) -> list[dict]:
|
|
week_start, week_end = get_week_boundaries(reference_date)
|
|
busy_map = await get_busy_blocks_for_participants(
|
|
db, participant_ids, week_start, week_end
|
|
)
|
|
|
|
participants_stmt = select(Participant).where(Participant.id.in_(participant_ids))
|
|
participants_result = await db.execute(participants_stmt)
|
|
participants = {p.id: p for p in participants_result.scalars().all()}
|
|
|
|
days = ["Mon", "Tue", "Wed", "Thu", "Fri"]
|
|
hours = list(range(9, 18))
|
|
slots = []
|
|
|
|
for day_offset, day_name in enumerate(days):
|
|
for hour in hours:
|
|
slot_start = week_start + timedelta(days=day_offset, hours=hour)
|
|
slot_end = slot_start + timedelta(hours=1)
|
|
|
|
available_participants = []
|
|
for pid in participant_ids:
|
|
if is_participant_free(busy_map.get(pid, []), slot_start, slot_end):
|
|
participant = participants.get(pid)
|
|
if participant:
|
|
available_participants.append(participant.name)
|
|
|
|
total = len(participant_ids)
|
|
available_count = len(available_participants)
|
|
|
|
if available_count == total:
|
|
availability = "full"
|
|
elif available_count > 0:
|
|
availability = "partial"
|
|
else:
|
|
availability = "none"
|
|
|
|
slots.append({
|
|
"day": slot_start.strftime("%Y-%m-%d"),
|
|
"hour": hour,
|
|
"availability": availability,
|
|
"availableParticipants": available_participants,
|
|
})
|
|
|
|
return slots
|