Compare commits

...

13 Commits

Author SHA1 Message Date
1878834ce6 chore(main): release 0.5.0 (#521) 2025-07-31 20:11:41 -06:00
f5b82d44e3 style: use ruff for linting and formatting (#524) 2025-07-31 17:57:43 -06:00
ad56165b54 fix: remove unused settings and utils files (#522)
* fix: remove unused settings and utils files

* fix: remove migration done

* fix: remove outdated scripts

* fix: removing deployment of hermes, not used anymore

* fix: partially remove secret, still have to understand frontend.
2025-07-31 17:45:48 -06:00
4ee19ed015 ci: update pull request template (#523) 2025-07-31 17:45:19 -06:00
406164033d feat: new summary using phi-4 and llama-index (#519)
* feat: add litellm backend implementation

* refactor: improve generate/completion methods for base LLM

* refactor: remove tokenizer logic

* style: apply code formatting

* fix: remove hallucinations from LLM responses

* refactor: comprehensive LLM and summarization rework

* chore: remove debug code

* feat: add structured output support to LiteLLM

* refactor: apply self-review improvements

* docs: add model structured output comments

* docs: update model structured output comments

* style: apply linting and formatting fixes

* fix: resolve type logic bug

* refactor: apply PR review feedback

* refactor: apply additional PR review feedback

* refactor: apply final PR review feedback

* fix: improve schema passing for LLMs without structured output

* feat: add PR comments and logger improvements

* docs: update README and add HTTP logging

* feat: improve HTTP logging

* feat: add summary chunking functionality

* fix: resolve title generation runtime issues

* refactor: apply self-review improvements

* style: apply linting and formatting

* feat: implement LiteLLM class structure

* style: apply linting and formatting fixes

* docs: env template model name fix

* chore: remove older litellm class

* chore: format

* refactor: simplify OpenAILLM

* refactor: OpenAILLM tokenizer

* refactor: self-review

* refactor: self-review

* refactor: self-review

* chore: format

* chore: remove LLM_USE_STRUCTURED_OUTPUT from envs

* chore: roll back migration lint changes

* chore: roll back migration lint changes

* fix: make summary llm configuration optional for the tests

* fix: missing f-string

* fix: tweak the prompt for summary title

* feat: try llamaindex for summarization

* fix: complete refactor of summary builder using llamaindex and structured output when possible

* fix: separate prompt as constant

* fix: typings

* fix: enhance prompt to prevent mentioning others subject while summarize one

* fix: various changes after self-review

* fix: from igor review

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-07-31 15:29:29 -06:00
81d316cb56 ci: remove conventional commit for ci (#520)
As we now squash merge, only the conventional commit is required for the
title of the PR
2025-07-31 15:19:16 -06:00
db3beae5cd chore(main): release 0.4.0 (#510) 2025-07-25 19:09:57 -06:00
Igor Loskutov
03b9a18c1b fix: remove faulty import Meeting (#512)
* fix: remove faulty import Meeting

* fix: remove faulty import Meeting
2025-07-25 17:48:10 -04:00
Igor Loskutov
7e3027adb6 fix: room concurrency (theoretically) (#511)
* fix: room concurrency (theoretically)

* cleanup

* cleanup
2025-07-25 17:37:51 -04:00
Igor Loskutov
27b43d85ab feat: Diarization cli (#509)
* diarisation cli

* feat: s3 upload for modal diarisation cli call

* chore: cleanup

* chore: s3 cleanup improvement

* chore: lint

* chore: cleanup

* chore: cleanup

* chore: cleanup

* chore: cleanup
2025-07-25 16:24:06 -04:00
2289a1a231 chore(main): release 0.3.2 (#506) 2025-07-22 19:15:47 -06:00
d0e130eb13 fix: match font size for the filter sidebar (#507) 2025-07-22 14:59:23 -06:00
24fabe3e86 fix: whereby consent not displaying (#505) 2025-07-22 12:20:26 -06:00
121 changed files with 2322 additions and 1943 deletions

View File

@@ -1,19 +1,21 @@
## ⚠️ Insert the PR TITLE replacing this text ⚠️
<!--- Provide a general summary of your changes in the Title above -->
⚠️ Describe your PR replacing this text. Post screenshots or videos whenever possible. ⚠️
## Description
<!--- Describe your changes in detail -->
### Checklist
## Related Issue
<!--- This project only accepts pull requests related to open issues -->
<!--- If suggesting a new feature or change, please discuss it in an issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps to reproduce -->
<!--- Please link to the issue here: -->
- [ ] My branch is updated with main (mandatory)
- [ ] I wrote unit tests for this (if applies)
- [ ] I have included migrations and tested them locally (if applies)
- [ ] I have manually tested this feature locally
## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->
> IMPORTANT: Remember that you are responsible for merging this PR after it's been reviewed, and once deployed
> you should perform manual testing to make sure everything went smoothly.
### Urgency
- [ ] Urgent (deploy ASAP)
- [ ] Non-urgent (deploying in next release is ok)
## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->
## Screenshots (if appropriate):

View File

@@ -1,19 +0,0 @@
name: Conventional commit PR
on: [pull_request]
jobs:
cog_check_job:
runs-on: ubuntu-latest
name: check conventional commit compliance
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
# pick the pr HEAD instead of the merge commit
ref: ${{ github.event.pull_request.head.sha }}
- name: Conventional commit check
uses: cocogitto/cocogitto-action@v3
with:
check-latest-tag-only: true

View File

@@ -15,25 +15,16 @@ repos:
hooks:
- id: debug-statements
- id: trailing-whitespace
exclude: ^server/trials
- id: detect-private-key
- repo: https://github.com/psf/black
rev: 24.1.1
hooks:
- id: black
files: ^server/(reflector|tests)/
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
name: isort (python)
files: ^server/(gpu|evaluate|reflector)/
args: [ "--profile", "black", "--filter-files" ]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.5
rev: v0.8.2
hooks:
- id: ruff
files: ^server/(reflector|tests)/
args:
- --fix
- --select
- I,F401
files: ^server/
- id: ruff-format
files: ^server/

View File

@@ -1,5 +1,38 @@
# Changelog
## [0.5.0](https://github.com/Monadical-SAS/reflector/compare/v0.4.0...v0.5.0) (2025-07-31)
### Features
* new summary using phi-4 and llama-index ([#519](https://github.com/Monadical-SAS/reflector/issues/519)) ([1bf9ce0](https://github.com/Monadical-SAS/reflector/commit/1bf9ce07c12f87f89e68a1dbb3b2c96c5ee62466))
### Bug Fixes
* remove unused settings and utils files ([#522](https://github.com/Monadical-SAS/reflector/issues/522)) ([2af4790](https://github.com/Monadical-SAS/reflector/commit/2af4790e4be9e588f282fbc1bb171c88a03d6479))
## [0.4.0](https://github.com/Monadical-SAS/reflector/compare/v0.3.2...v0.4.0) (2025-07-25)
### Features
* Diarization cli ([#509](https://github.com/Monadical-SAS/reflector/issues/509)) ([ffc8003](https://github.com/Monadical-SAS/reflector/commit/ffc8003e6dad236930a27d0fe3e2f2adfb793890))
### Bug Fixes
* remove faulty import Meeting ([#512](https://github.com/Monadical-SAS/reflector/issues/512)) ([0e68c79](https://github.com/Monadical-SAS/reflector/commit/0e68c798434e1b481f9482cc3a4702ea00365df4))
* room concurrency (theoretically) ([#511](https://github.com/Monadical-SAS/reflector/issues/511)) ([7bb3676](https://github.com/Monadical-SAS/reflector/commit/7bb367653afeb2778cff697a0eb217abf0b81b84))
## [0.3.2](https://github.com/Monadical-SAS/reflector/compare/v0.3.1...v0.3.2) (2025-07-22)
### Bug Fixes
* match font size for the filter sidebar ([#507](https://github.com/Monadical-SAS/reflector/issues/507)) ([4b8ba5d](https://github.com/Monadical-SAS/reflector/commit/4b8ba5db1733557e27b098ad3d1cdecadf97ae52))
* whereby consent not displaying ([#505](https://github.com/Monadical-SAS/reflector/issues/505)) ([1120552](https://github.com/Monadical-SAS/reflector/commit/1120552c2c83d084d3a39272ad49b6aeda1af98f))
## [0.3.1](https://github.com/Monadical-SAS/reflector/compare/v0.3.0...v0.3.1) (2025-07-22)

View File

@@ -172,3 +172,7 @@ Modal.com integration for scalable ML processing:
- **Audio Routing**: Use BlackHole (Mac) for merging multiple audio sources
- **WebRTC**: Ensure proper CORS configuration for cross-origin streaming
- **Database**: Run `uv run alembic upgrade head` after pulling schema changes
## Pipeline/worker related info
If you need to do any worker/pipeline related work, search for "Pipeline" classes and their "create" or "build" methods to find the main processor sequence. Look for task orchestration patterns (like "chord", "group", or "chain") to identify the post-processing flow with parallel execution chains. This will give you abstract vision on how processing pipeling is organized.

View File

@@ -4,8 +4,8 @@
Reflector Audio Management and Analysis is a cutting-edge web application under development by Monadical. It utilizes AI to record meetings, providing a permanent record with transcripts, translations, and automated summaries.
[![Tests](https://github.com/monadical-sas/cubbi/actions/workflows/pytests.yml/badge.svg?branch=main&event=push)](https://github.com/monadical-sas/cubbi/actions/workflows/pytests.yml)
[![License: MIT](https://img.shields.io/badge/license-AGPL--v3-green.svg)](https://opensource.org/licenses/AGPL-v3)
[![Tests](https://github.com/monadical-sas/reflector/actions/workflows/pytests.yml/badge.svg?branch=main&event=push)](https://github.com/monadical-sas/reflector/actions/workflows/pytests.yml)
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)
</div>
## Screenshots
@@ -74,7 +74,7 @@ Note: We currently do not have instructions for Windows users.
### Frontend
Start with `cd backend`.
Start with `cd www`.
**Installation**

View File

@@ -1,16 +0,0 @@
TRANSCRIPT_BACKEND=modal
TRANSCRIPT_URL=https://monadical-sas--reflector-transcriber-web.modal.run
TRANSCRIPT_MODAL_API_KEY=***REMOVED***
LLM_BACKEND=modal
LLM_URL=https://monadical-sas--reflector-llm-web.modal.run
LLM_MODAL_API_KEY=***REMOVED***
TRANSLATE_URL=https://monadical-sas--reflector-translator-web.modal.run
ZEPHYR_LLM_URL=https://monadical-sas--reflector-llm-zephyr-web.modal.run
DIARIZATION_URL=https://monadical-sas--reflector-diarizer-web.modal.run
BASE_URL=https://xxxxx.ngrok.app
DIARIZATION_ENABLED=false
SQS_POLLING_TIMEOUT_SECONDS=60

1
server/.gitignore vendored
View File

@@ -180,3 +180,4 @@ reflector.sqlite3
data/
dump.rdb

View File

@@ -20,3 +20,23 @@ Polls SQS every 60 seconds via /server/reflector/worker/process.py:24-62:
# Every 60 seconds, check for new recordings
sqs = boto3.client("sqs", ...)
response = sqs.receive_message(QueueUrl=queue_url, ...)
# Requeue
```bash
uv run /app/requeue_uploaded_file.py TRANSCRIPT_ID
```
## Pipeline Management
### Continue stuck pipeline from final summaries (identify_participants) step:
```bash
uv run python -c "from reflector.pipelines.main_live_pipeline import task_pipeline_final_summaries; result = task_pipeline_final_summaries.delay(transcript_id='TRANSCRIPT_ID'); print(f'Task queued: {result.id}')"
```
### Run full post-processing pipeline (continues to completion):
```bash
uv run python -c "from reflector.pipelines.main_live_pipeline import pipeline_post; pipeline_post(transcript_id='TRANSCRIPT_ID')"
```

View File

@@ -20,7 +20,6 @@ AUTH_JWT_AUDIENCE=
## Using local whisper
#TRANSCRIPT_BACKEND=whisper
#WHISPER_MODEL_SIZE=tiny
## Using serverless modal.com (require reflector-gpu-modal deployed)
#TRANSCRIPT_BACKEND=modal
@@ -30,7 +29,7 @@ AUTH_JWT_AUDIENCE=
TRANSCRIPT_BACKEND=modal
TRANSCRIPT_URL=https://monadical-sas--reflector-transcriber-web.modal.run
TRANSCRIPT_MODAL_API_KEY=***REMOVED***
TRANSCRIPT_MODAL_API_KEY=
## =======================================================
## Transcription backend
@@ -50,7 +49,7 @@ TRANSLATE_URL=https://monadical-sas--reflector-translator-web.modal.run
## Using serverless modal.com (require reflector-gpu-modal deployed)
LLM_BACKEND=modal
LLM_URL=https://monadical-sas--reflector-llm-web.modal.run
LLM_MODAL_API_KEY=***REMOVED***
LLM_MODAL_API_KEY=
ZEPHYR_LLM_URL=https://monadical-sas--reflector-llm-zephyr-web.modal.run
@@ -70,6 +69,16 @@ ZEPHYR_LLM_URL=https://monadical-sas--reflector-llm-zephyr-web.modal.run
## Cache directory to store models
CACHE_DIR=data
## =======================================================
## Summary LLM configuration
## =======================================================
## Context size for summary generation (tokens)
SUMMARY_LLM_CONTEXT_SIZE_TOKENS=16000
SUMMARY_LLM_URL=
SUMMARY_LLM_API_KEY=sk-
SUMMARY_MODEL=
## =======================================================
## Diarization
##

View File

@@ -9,7 +9,6 @@ import os
import threading
from typing import Optional
import modal
from modal import App, Image, Secret, asgi_app, enter, exit, method
# LLM

View File

@@ -9,7 +9,6 @@ import os
import threading
from typing import Optional
import modal
from modal import App, Image, Secret, asgi_app, enter, exit, method
# LLM

View File

@@ -1,171 +0,0 @@
# # Run an OpenAI-Compatible vLLM Server
import modal
MODELS_DIR = "/llamas"
MODEL_NAME = "NousResearch/Hermes-3-Llama-3.1-8B"
N_GPU = 1
def download_llm():
from huggingface_hub import snapshot_download
print("Downloading LLM model")
snapshot_download(
MODEL_NAME,
local_dir=f"{MODELS_DIR}/{MODEL_NAME}",
ignore_patterns=[
"*.pt",
"*.bin",
"*.pth",
"original/*",
], # Ensure safetensors
)
print("LLM model downloaded")
def move_cache():
from transformers.utils import move_cache as transformers_move_cache
transformers_move_cache()
vllm_image = (
modal.Image.debian_slim(python_version="3.10")
.pip_install("vllm==0.5.3post1")
.env({"HF_HUB_ENABLE_HF_TRANSFER": "1"})
.pip_install(
# "accelerate==0.34.2",
"einops==0.8.0",
"hf-transfer~=0.1",
)
.run_function(download_llm)
.run_function(move_cache)
.pip_install(
"bitsandbytes>=0.42.9",
)
)
app = modal.App("reflector-vllm-hermes3")
@app.function(
image=vllm_image,
gpu=modal.gpu.A100(count=N_GPU, size="40GB"),
timeout=60 * 5,
scaledown_window=60 * 5,
allow_concurrent_inputs=100,
secrets=[
modal.Secret.from_name("reflector-gpu"),
],
)
@modal.asgi_app()
def serve():
import os
import fastapi
import vllm.entrypoints.openai.api_server as api_server
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.usage.usage_lib import UsageContext
TOKEN = os.environ["REFLECTOR_GPU_APIKEY"]
# create a fastAPI app that uses vLLM's OpenAI-compatible router
web_app = fastapi.FastAPI(
title=f"OpenAI-compatible {MODEL_NAME} server",
description="Run an OpenAI-compatible LLM server with vLLM on modal.com",
version="0.0.1",
docs_url="/docs",
)
# security: CORS middleware for external requests
http_bearer = fastapi.security.HTTPBearer(
scheme_name="Bearer Token",
description="See code for authentication details.",
)
web_app.add_middleware(
fastapi.middleware.cors.CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# security: inject dependency on authed routes
async def is_authenticated(api_key: str = fastapi.Security(http_bearer)):
if api_key.credentials != TOKEN:
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
return {"username": "authenticated_user"}
router = fastapi.APIRouter(dependencies=[fastapi.Depends(is_authenticated)])
# wrap vllm's router in auth router
router.include_router(api_server.router)
# add authed vllm to our fastAPI app
web_app.include_router(router)
engine_args = AsyncEngineArgs(
model=MODELS_DIR + "/" + MODEL_NAME,
tensor_parallel_size=N_GPU,
gpu_memory_utilization=0.90,
# max_model_len=8096,
enforce_eager=False, # capture the graph for faster inference, but slower cold starts (30s > 20s)
# --- 4 bits load
# quantization="bitsandbytes",
# load_format="bitsandbytes",
)
engine = AsyncLLMEngine.from_engine_args(
engine_args, usage_context=UsageContext.OPENAI_API_SERVER
)
model_config = get_model_config(engine)
request_logger = RequestLogger(max_log_len=2048)
api_server.openai_serving_chat = OpenAIServingChat(
engine,
model_config=model_config,
served_model_names=[MODEL_NAME],
chat_template=None,
response_role="assistant",
lora_modules=[],
prompt_adapters=[],
request_logger=request_logger,
)
api_server.openai_serving_completion = OpenAIServingCompletion(
engine,
model_config=model_config,
served_model_names=[MODEL_NAME],
lora_modules=[],
prompt_adapters=[],
request_logger=request_logger,
)
return web_app
def get_model_config(engine):
import asyncio
try: # adapted from vLLM source -- https://github.com/vllm-project/vllm/blob/507ef787d85dec24490069ffceacbd6b161f4f72/vllm/entrypoints/openai/api_server.py#L235C1-L247C1
event_loop = asyncio.get_running_loop()
except RuntimeError:
event_loop = None
if event_loop is not None and event_loop.is_running():
# If the current is instanced by Ray Serve,
# there is already a running event loop
model_config = event_loop.run_until_complete(engine.get_model_config())
else:
# When using single vLLM without engine_use_ray
model_config = asyncio.run(engine.get_model_config())
return model_config

View File

@@ -1,16 +0,0 @@
LOAD DATABASE
FROM sqlite:///app/reflector.sqlite3
INTO pgsql://reflector:reflector@postgres:5432/reflector
WITH
include drop,
create tables,
create indexes,
reset sequences,
preserve index names,
prefetch rows = 10
SET
work_mem to '512MB',
maintenance_work_mem to '1024MB'
CAST
column transcript.duration to float using (lambda (val) (when val (format nil "~f" val)))
;

View File

@@ -1,9 +1,10 @@
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
from reflector.db import metadata
from reflector.settings import settings
from sqlalchemy import engine_from_config, pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.

View File

@@ -8,7 +8,6 @@ Create Date: 2024-09-24 16:12:56.944133
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.

View File

@@ -5,11 +5,11 @@ Revises: f819277e5169
Create Date: 2023-11-07 11:12:21.614198
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "0fea6d96b096"

View File

@@ -5,26 +5,26 @@ Revises: 0fea6d96b096
Create Date: 2023-11-30 15:56:03.341466
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '125031f7cb78'
down_revision: Union[str, None] = '0fea6d96b096'
revision: str = "125031f7cb78"
down_revision: Union[str, None] = "0fea6d96b096"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('transcript', sa.Column('participants', sa.JSON(), nullable=True))
op.add_column("transcript", sa.Column("participants", sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('transcript', 'participants')
op.drop_column("transcript", "participants")
# ### end Alembic commands ###

View File

@@ -5,6 +5,7 @@ Revises: f819277e5169
Create Date: 2025-06-17 14:00:03.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
@@ -19,16 +20,16 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
'meeting_consent',
sa.Column('id', sa.String(), nullable=False),
sa.Column('meeting_id', sa.String(), nullable=False),
sa.Column('user_id', sa.String(), nullable=True),
sa.Column('consent_given', sa.Boolean(), nullable=False),
sa.Column('consent_timestamp', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['meeting_id'], ['meeting.id']),
"meeting_consent",
sa.Column("id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("user_id", sa.String(), nullable=True),
sa.Column("consent_given", sa.Boolean(), nullable=False),
sa.Column("consent_timestamp", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"]),
)
def downgrade() -> None:
op.drop_table('meeting_consent')
op.drop_table("meeting_consent")

View File

@@ -5,6 +5,7 @@ Revises: 20250617140003
Create Date: 2025-06-18 14:00:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
@@ -22,4 +23,4 @@ def upgrade() -> None:
def downgrade() -> None:
op.drop_column("transcript", "audio_deleted")
op.drop_column("transcript", "audio_deleted")

View File

@@ -5,36 +5,40 @@ Revises: ccd68dc784ff
Create Date: 2025-07-15 16:53:40.397394
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '2cf0b60a9d34'
down_revision: Union[str, None] = 'ccd68dc784ff'
revision: str = "2cf0b60a9d34"
down_revision: Union[str, None] = "ccd68dc784ff"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('transcript', schema=None) as batch_op:
batch_op.alter_column('duration',
existing_type=sa.INTEGER(),
type_=sa.Float(),
existing_nullable=True)
with op.batch_alter_table("transcript", schema=None) as batch_op:
batch_op.alter_column(
"duration",
existing_type=sa.INTEGER(),
type_=sa.Float(),
existing_nullable=True,
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('transcript', schema=None) as batch_op:
batch_op.alter_column('duration',
existing_type=sa.Float(),
type_=sa.INTEGER(),
existing_nullable=True)
with op.batch_alter_table("transcript", schema=None) as batch_op:
batch_op.alter_column(
"duration",
existing_type=sa.Float(),
type_=sa.INTEGER(),
existing_nullable=True,
)
# ### end Alembic commands ###

View File

@@ -5,17 +5,17 @@ Revises: 9920ecfe2735
Create Date: 2023-11-02 19:53:09.116240
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
from alembic import op
from sqlalchemy import select
from sqlalchemy.sql import column, table
# revision identifiers, used by Alembic.
revision: str = '38a927dcb099'
down_revision: Union[str, None] = '9920ecfe2735'
revision: str = "38a927dcb099"
down_revision: Union[str, None] = "9920ecfe2735"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

View File

@@ -5,13 +5,13 @@ Revises: 38a927dcb099
Create Date: 2023-11-10 18:12:17.886522
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
from alembic import op
from sqlalchemy import select
from sqlalchemy.sql import column, table
# revision identifiers, used by Alembic.
revision: str = "4814901632bc"
@@ -24,9 +24,11 @@ def upgrade() -> None:
# for all the transcripts, calculate the duration from the mp3
# and update the duration column
from pathlib import Path
from reflector.settings import settings
import av
from reflector.settings import settings
bind = op.get_bind()
transcript = table(
"transcript", column("id", sa.String), column("duration", sa.Float)

View File

@@ -5,14 +5,11 @@ Revises:
Create Date: 2023-08-29 10:54:45.142974
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '543ed284d69a'
revision: str = "543ed284d69a"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

View File

@@ -8,9 +8,8 @@ Create Date: 2025-06-27 09:04:21.006823
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "62dea3db63a5"

View File

@@ -5,26 +5,28 @@ Revises: 62dea3db63a5
Create Date: 2024-09-06 14:02:06.649665
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '764ce6db4388'
down_revision: Union[str, None] = '62dea3db63a5'
revision: str = "764ce6db4388"
down_revision: Union[str, None] = "62dea3db63a5"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('transcript', sa.Column('zulip_message_id', sa.Integer(), nullable=True))
op.add_column(
"transcript", sa.Column("zulip_message_id", sa.Integer(), nullable=True)
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('transcript', 'zulip_message_id')
op.drop_column("transcript", "zulip_message_id")
# ### end Alembic commands ###

View File

@@ -9,8 +9,6 @@ Create Date: 2025-07-15 19:30:19.876332
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "88d292678ba2"
@@ -21,7 +19,7 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
import json
import re
from sqlalchemy import text
# Get database connection
@@ -58,7 +56,9 @@ def upgrade() -> None:
fixed_events = json.dumps(jevents)
assert "NaN" not in fixed_events
except (json.JSONDecodeError, AssertionError) as e:
print(f"Warning: Invalid JSON for transcript {transcript_id}, skipping: {e}")
print(
f"Warning: Invalid JSON for transcript {transcript_id}, skipping: {e}"
)
continue
# Update the record with fixed JSON

View File

@@ -5,13 +5,13 @@ Revises: 99365b0cd87b
Create Date: 2023-11-02 18:55:17.019498
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
from alembic import op
from sqlalchemy import select
from sqlalchemy.sql import column, table
# revision identifiers, used by Alembic.
revision: str = "9920ecfe2735"

View File

@@ -8,8 +8,8 @@ Create Date: 2023-09-01 20:19:47.216334
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "99365b0cd87b"

View File

@@ -9,8 +9,6 @@ Create Date: 2025-07-15 20:09:40.253018
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "a9c9c229ee36"

View File

@@ -5,30 +5,34 @@ Revises: 6ea59639f30e
Create Date: 2025-01-28 10:06:50.446233
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = 'b0e5f7876032'
down_revision: Union[str, None] = '6ea59639f30e'
revision: str = "b0e5f7876032"
down_revision: Union[str, None] = "6ea59639f30e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('meeting', schema=None) as batch_op:
batch_op.add_column(sa.Column('is_active', sa.Boolean(), server_default=sa.text('1'), nullable=False))
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"is_active", sa.Boolean(), server_default=sa.text("1"), nullable=False
)
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('meeting', schema=None) as batch_op:
batch_op.drop_column('is_active')
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_column("is_active")
# ### end Alembic commands ###

View File

@@ -8,9 +8,8 @@ Create Date: 2025-06-27 08:57:16.306940
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "b3df9681cae9"

View File

@@ -8,9 +8,8 @@ Create Date: 2024-10-11 13:45:28.914902
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "b469348df210"

View File

@@ -0,0 +1,35 @@
"""add_unique_constraint_one_active_meeting_per_room
Revision ID: b7df9609542c
Revises: d7fbb74b673b
Create Date: 2025-07-25 16:27:06.959868
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "b7df9609542c"
down_revision: Union[str, None] = "d7fbb74b673b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create a partial unique index that ensures only one active meeting per room
# This works for both PostgreSQL and SQLite
op.create_index(
"idx_one_active_meeting_per_room",
"meeting",
["room_id"],
unique=True,
postgresql_where=sa.text("is_active = true"),
sqlite_where=sa.text("is_active = 1"),
)
def downgrade() -> None:
op.drop_index("idx_one_active_meeting_per_room", table_name="meeting")

View File

@@ -5,25 +5,31 @@ Revises: 125031f7cb78
Create Date: 2023-12-13 15:37:51.303970
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = 'b9348748bbbc'
down_revision: Union[str, None] = '125031f7cb78'
revision: str = "b9348748bbbc"
down_revision: Union[str, None] = "125031f7cb78"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('transcript', sa.Column('reviewed', sa.Boolean(), server_default=sa.text('0'), nullable=False))
op.add_column(
"transcript",
sa.Column(
"reviewed", sa.Boolean(), server_default=sa.text("0"), nullable=False
),
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('transcript', 'reviewed')
op.drop_column("transcript", "reviewed")
# ### end Alembic commands ###

View File

@@ -9,8 +9,6 @@ Create Date: 2025-07-15 11:48:42.854741
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "ccd68dc784ff"

View File

@@ -8,9 +8,8 @@ Create Date: 2025-06-27 09:27:25.302152
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "d3ff3a39297f"

View File

@@ -56,4 +56,4 @@ def downgrade() -> None:
op.drop_index("idx_transcript_room_id", "transcript")
# Drop the room_id column
op.drop_column("transcript", "room_id")
op.drop_column("transcript", "room_id")

View File

@@ -5,11 +5,11 @@ Revises: 4814901632bc
Create Date: 2023-11-16 10:29:09.351664
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f819277e5169"

View File

@@ -38,6 +38,8 @@ dependencies = [
"jsonschema>=4.23.0",
"openai>=1.59.7",
"psycopg2-binary>=2.9.10",
"llama-index>=0.12.52",
"llama-index-llms-openai-like>=0.4.0",
]
[dependency-groups]

View File

@@ -1,12 +1,13 @@
from contextlib import asynccontextmanager
import reflector.auth # noqa
import reflector.db # noqa
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.routing import APIRoute
from fastapi_pagination import add_pagination
from prometheus_fastapi_instrumentator import Instrumentator
import reflector.auth # noqa
import reflector.db # noqa
from reflector.events import subscribers_shutdown, subscribers_startup
from reflector.logger import logger
from reflector.metrics import metrics_init

View File

@@ -1,7 +1,8 @@
from reflector.settings import settings
from reflector.logger import logger
import importlib
from reflector.logger import logger
from reflector.settings import settings
logger.info(f"User authentication using {settings.AUTH_BACKEND}")
module_name = f"reflector.auth.auth_{settings.AUTH_BACKEND}"
auth_module = importlib.import_module(module_name)

View File

@@ -4,6 +4,7 @@ from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from reflector.logger import logger
from reflector.settings import settings

View File

@@ -1,7 +1,8 @@
from pydantic import BaseModel
from typing import Annotated
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)

View File

@@ -1,12 +1,12 @@
import argparse
import asyncio
import signal
from typing import NoReturn
from aiortc.contrib.signaling import add_signaling_arguments, create_signaling
from reflector.logger import logger
from reflector.stream_client import StreamClient
from typing import NoReturn
async def main() -> NoReturn:
@@ -51,7 +51,7 @@ async def main() -> NoReturn:
logger.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
logger.info(f'{"Flushing metrics"}')
logger.info(f"{'Flushing metrics'}")
loop.stop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)

View File

@@ -1,5 +1,6 @@
import databases
import sqlalchemy
from reflector.events import subscribers_shutdown, subscribers_startup
from reflector.settings import settings

View File

@@ -4,6 +4,7 @@ from typing import Literal
import sqlalchemy as sa
from fastapi import HTTPException
from pydantic import BaseModel, Field
from reflector.db import database, metadata
from reflector.db.rooms import Room
from reflector.utils import generate_uuid4

View File

@@ -3,6 +3,7 @@ from typing import Literal
import sqlalchemy as sa
from pydantic import BaseModel, Field
from reflector.db import database, metadata
from reflector.utils import generate_uuid4

View File

@@ -5,9 +5,10 @@ from typing import Literal
import sqlalchemy
from fastapi import HTTPException
from pydantic import BaseModel, Field
from sqlalchemy.sql import false, or_
from reflector.db import database, metadata
from reflector.utils import generate_uuid4
from sqlalchemy.sql import false, or_
rooms = sqlalchemy.Table(
"room",

View File

@@ -10,13 +10,14 @@ from typing import Any, Literal
import sqlalchemy
from fastapi import HTTPException
from pydantic import BaseModel, ConfigDict, Field, field_serializer
from sqlalchemy import Enum
from sqlalchemy.sql import false, or_
from reflector.db import database, metadata
from reflector.processors.types import Word as ProcessorWord
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.utils import generate_uuid4
from sqlalchemy import Enum
from sqlalchemy.sql import false, or_
class SourceKind(enum.StrEnum):

View File

@@ -5,11 +5,12 @@ from typing import TypeVar
import nltk
from prometheus_client import Counter, Histogram
from transformers import GenerationConfig
from reflector.llm.llm_params import TaskParams
from reflector.logger import logger as reflector_logger
from reflector.settings import settings
from reflector.utils.retry import retry
from transformers import GenerationConfig
T = TypeVar("T", bound="LLM")
@@ -17,6 +18,7 @@ T = TypeVar("T", bound="LLM")
class LLM:
_nltk_downloaded = False
_registry = {}
model_name: str
m_generate = Histogram(
"llm_generate",
"Time spent in LLM.generate",
@@ -60,7 +62,7 @@ class LLM:
Return an instance depending on the settings.
Settings used:
- `LLM_BACKEND`: key of the backend, defaults to `oobabooga`
- `LLM_BACKEND`: key of the backend
- `LLM_URL`: url of the backend
"""
if name is None:
@@ -69,6 +71,7 @@ class LLM:
module_name = f"reflector.llm.llm_{name}"
importlib.import_module(module_name)
cls.ensure_nltk()
return cls._registry[name](model_name)
def get_model_name(self) -> str:
@@ -121,6 +124,11 @@ class LLM:
def _get_tokenizer(self):
pass
def has_structured_output(self):
# whether implementation supports structured output
# on the model side (otherwise it's prompt engineering)
return False
async def generate(
self,
prompt: str,
@@ -140,6 +148,7 @@ class LLM:
prompt=prompt,
gen_schema=gen_schema,
gen_cfg=gen_cfg,
logger=logger,
**kwargs,
)
self.m_generate_success.inc()
@@ -167,7 +176,9 @@ class LLM:
try:
with self.m_generate.time():
result = await retry(self._completion)(messages=messages, **kwargs)
result = await retry(self._completion)(
messages=messages, **{**kwargs, "logger": logger}
)
self.m_generate_success.inc()
except Exception:
logger.exception("Failed to call llm after retrying")
@@ -253,9 +264,7 @@ class LLM:
) -> str:
raise NotImplementedError
async def _completion(
self, messages: list, logger: reflector_logger, **kwargs
) -> dict:
async def _completion(self, messages: list, **kwargs) -> dict:
raise NotImplementedError
def _parse_json(self, result: str) -> dict:

View File

@@ -1,9 +1,10 @@
import httpx
from transformers import AutoTokenizer, GenerationConfig
from reflector.llm.base import LLM
from reflector.logger import logger as reflector_logger
from reflector.settings import settings
from reflector.utils.retry import retry
from transformers import AutoTokenizer, GenerationConfig
class ModalLLM(LLM):
@@ -31,7 +32,7 @@ class ModalLLM(LLM):
async def _generate(
self, prompt: str, gen_schema: dict | None, gen_cfg: dict | None, **kwargs
):
) -> str:
json_payload = {"prompt": prompt}
if gen_schema:
json_payload["gen_schema"] = gen_schema
@@ -52,12 +53,14 @@ class ModalLLM(LLM):
timeout=self.timeout,
retry_timeout=60 * 5,
follow_redirects=True,
logger=kwargs.get("logger", reflector_logger),
)
response.raise_for_status()
text = response.json()["text"]
return text
async def _completion(self, messages: list, **kwargs) -> dict:
# returns full api response
kwargs.setdefault("temperature", 0.3)
kwargs.setdefault("max_tokens", 2048)
kwargs.setdefault("stream", False)
@@ -78,6 +81,7 @@ class ModalLLM(LLM):
timeout=self.timeout,
retry_timeout=60 * 5,
follow_redirects=True,
logger=kwargs.get("logger", reflector_logger),
)
response.raise_for_status()
return response.json()

View File

@@ -1,29 +0,0 @@
import httpx
from reflector.llm.base import LLM
from reflector.settings import settings
class OobaboogaLLM(LLM):
def __init__(self, model_name: str | None = None):
super().__init__()
async def _generate(
self, prompt: str, gen_schema: dict | None, gen_cfg: dict | None, **kwargs
):
json_payload = {"prompt": prompt}
if gen_schema:
json_payload["gen_schema"] = gen_schema
if gen_cfg:
json_payload.update(gen_cfg)
async with httpx.AsyncClient() as client:
response = await client.post(
settings.LLM_URL,
headers={"Content-Type": "application/json"},
json=json_payload,
)
response.raise_for_status()
return response.json()
LLM.register("oobabooga", OobaboogaLLM)

View File

@@ -0,0 +1,118 @@
import httpx
from transformers import AutoTokenizer
from reflector.logger import logger
def apply_gen_config(payload: dict, gen_cfg) -> None:
"""Apply generation config overrides to the payload."""
config_mapping = {
"temperature": "temperature",
"max_new_tokens": "max_tokens",
"max_tokens": "max_tokens",
"top_p": "top_p",
"frequency_penalty": "frequency_penalty",
"presence_penalty": "presence_penalty",
}
for cfg_attr, payload_key in config_mapping.items():
value = getattr(gen_cfg, cfg_attr, None)
if value is not None:
payload[payload_key] = value
if cfg_attr == "max_new_tokens": # Handle max_new_tokens taking precedence
break
class OpenAILLM:
def __init__(self, config_prefix: str, settings):
self.config_prefix = config_prefix
self.settings_obj = settings
self.model_name = getattr(settings, f"{config_prefix}_MODEL")
self.url = getattr(settings, f"{config_prefix}_LLM_URL")
self.api_key = getattr(settings, f"{config_prefix}_LLM_API_KEY")
timeout = getattr(settings, f"{config_prefix}_LLM_TIMEOUT", 300)
self.temperature = getattr(settings, f"{config_prefix}_LLM_TEMPERATURE", 0.7)
self.max_tokens = getattr(settings, f"{config_prefix}_LLM_MAX_TOKENS", 1024)
self.client = httpx.AsyncClient(timeout=timeout)
# Use a tokenizer that approximates OpenAI token counting
tokenizer_name = getattr(settings, f"{config_prefix}_TOKENIZER", "gpt2")
try:
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
except Exception:
logger.debug(
f"Failed to load tokenizer '{tokenizer_name}', falling back to default 'gpt2' tokenizer"
)
self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
async def generate(
self, prompt: str, gen_schema=None, gen_cfg=None, logger=None
) -> str:
if logger:
logger.debug(
"OpenAI LLM generate",
prompt=repr(prompt[:100] + "..." if len(prompt) > 100 else prompt),
)
messages = [{"role": "user", "content": prompt}]
result = await self.completion(
messages, gen_schema=gen_schema, gen_cfg=gen_cfg, logger=logger
)
return result["choices"][0]["message"]["content"]
async def completion(
self, messages: list, gen_schema=None, gen_cfg=None, logger=None, **kwargs
) -> dict:
if logger:
logger.info("OpenAI LLM completion", messages_count=len(messages))
payload = {
"model": self.model_name,
"messages": messages,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
}
# Apply generation config overrides
if gen_cfg:
apply_gen_config(payload, gen_cfg)
# Apply structured output schema
if gen_schema:
payload["response_format"] = {
"type": "json_schema",
"json_schema": {"name": "response", "schema": gen_schema},
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
url = f"{self.url.rstrip('/')}/chat/completions"
if logger:
logger.debug(
"OpenAI API request", url=url, payload_keys=list(payload.keys())
)
response = await self.client.post(url, json=payload, headers=headers)
response.raise_for_status()
result = response.json()
if logger:
logger.debug(
"OpenAI API response",
status_code=response.status_code,
choices_count=len(result.get("choices", [])),
)
return result
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()

View File

@@ -16,8 +16,10 @@ import functools
from contextlib import asynccontextmanager
import boto3
from celery import chord, group, shared_task
from celery import chord, current_task, group, shared_task
from pydantic import BaseModel
from structlog import BoundLogger as Logger
from reflector.db.meetings import meeting_consent_controller, meetings_controller
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
@@ -61,7 +63,6 @@ from reflector.zulip import (
send_message_to_zulip,
update_zulip_message,
)
from structlog import BoundLogger as Logger
def asynctask(f):
@@ -111,16 +112,29 @@ def get_transcript(func):
Decorator to fetch the transcript from the database from the first argument
"""
@functools.wraps(func)
async def wrapper(**kwargs):
transcript_id = kwargs.pop("transcript_id")
transcript = await transcripts_controller.get_by_id(transcript_id=transcript_id)
if not transcript:
raise Exception("Transcript {transcript_id} not found")
# Enhanced logger with Celery task context
tlogger = logger.bind(transcript_id=transcript.id)
if current_task:
tlogger = tlogger.bind(
task_id=current_task.request.id,
task_name=current_task.name,
worker_hostname=current_task.request.hostname,
task_retries=current_task.request.retries,
transcript_id=transcript_id,
)
try:
return await func(transcript=transcript, logger=tlogger, **kwargs)
result = await func(transcript=transcript, logger=tlogger, **kwargs)
return result
except Exception as exc:
tlogger.error("Pipeline error", exc_info=exc)
tlogger.error("Pipeline error", function_name=func.__name__, exc_info=exc)
raise
return wrapper

View File

@@ -18,6 +18,7 @@ During its lifecycle, it will emit the following status:
import asyncio
from pydantic import BaseModel, ConfigDict
from reflector.logger import logger
from reflector.processors import Pipeline

View File

@@ -1,6 +1,7 @@
from reflector.processors.base import Processor
import av
from reflector.processors.base import Processor
class AudioChunkerProcessor(Processor):
"""

View File

@@ -1,4 +1,5 @@
import httpx
from reflector.processors.audio_diarization import AudioDiarizationProcessor
from reflector.processors.audio_diarization_auto import AudioDiarizationAutoProcessor
from reflector.processors.types import AudioDiarizationInput, TitleSummary

View File

@@ -1,6 +1,7 @@
from pathlib import Path
import av
from reflector.processors.base import Processor

View File

@@ -1,10 +1,12 @@
from reflector.processors.base import Processor
from reflector.processors.types import AudioFile
import io
from time import monotonic_ns
from uuid import uuid4
import io
import av
from reflector.processors.base import Processor
from reflector.processors.types import AudioFile
class AudioMergeProcessor(Processor):
"""

View File

@@ -1,4 +1,5 @@
from prometheus_client import Counter, Histogram
from reflector.processors.base import Processor
from reflector.processors.types import AudioFile, Transcript

View File

@@ -13,6 +13,7 @@ API will be a POST request to TRANSCRIPT_URL:
"""
from openai import AsyncOpenAI
from reflector.processors.audio_transcript import AudioTranscriptProcessor
from reflector.processors.audio_transcript_auto import AudioTranscriptAutoProcessor
from reflector.processors.types import AudioFile, Transcript, Word

View File

@@ -1,4 +1,5 @@
from faster_whisper import WhisperModel
from reflector.processors.audio_transcript import AudioTranscriptProcessor
from reflector.processors.audio_transcript_auto import AudioTranscriptAutoProcessor
from reflector.processors.types import AudioFile, Transcript, Word

View File

@@ -5,6 +5,7 @@ from uuid import uuid4
from prometheus_client import Counter, Gauge, Histogram
from pydantic import BaseModel
from reflector.logger import logger

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,8 @@
from reflector.llm import LLM
from reflector.llm.openai_llm import OpenAILLM
from reflector.processors.base import Processor
from reflector.processors.summary.summary_builder import SummaryBuilder
from reflector.processors.types import FinalLongSummary, FinalShortSummary, TitleSummary
from reflector.settings import settings
class TranscriptFinalSummaryProcessor(Processor):
@@ -16,14 +17,14 @@ class TranscriptFinalSummaryProcessor(Processor):
super().__init__(**kwargs)
self.transcript = transcript
self.chunks: list[TitleSummary] = []
self.llm = LLM.get_instance(model_name="NousResearch/Hermes-3-Llama-3.1-8B")
self.llm = OpenAILLM(config_prefix="SUMMARY", settings=settings)
self.builder = None
async def _push(self, data: TitleSummary):
self.chunks.append(data)
async def get_summary_builder(self, text) -> SummaryBuilder:
builder = SummaryBuilder(self.llm)
builder = SummaryBuilder(self.llm, logger=self.logger)
builder.set_transcript(text)
await builder.identify_participants()
await builder.generate_summary()

View File

@@ -49,7 +49,7 @@ class TranscriptFinalTitleProcessor(Processor):
gen_cfg=self.params.gen_cfg,
logger=self.logger,
)
accumulated_titles += title_result["summary"]
accumulated_titles += title_result["title"]
return await self.get_title(accumulated_titles)

View File

@@ -1,4 +1,5 @@
import httpx
from reflector.processors.base import Processor
from reflector.processors.types import Transcript, TranslationLanguages
from reflector.settings import settings
@@ -52,6 +53,7 @@ class TranscriptTranslatorProcessor(Processor):
params=json_payload,
timeout=self.timeout,
follow_redirects=True,
logger=self.logger,
)
response.raise_for_status()
result = response.json()["text"]

View File

@@ -5,6 +5,7 @@ from pathlib import Path
from profanityfilter import ProfanityFilter
from pydantic import BaseModel, PrivateAttr
from reflector.redis_cache import redis_cache
PUNC_RE = re.compile(r"[.;:?!…]")

View File

@@ -2,6 +2,7 @@ import functools
import json
import redis
from reflector.settings import settings
redis_clients = {}

View File

@@ -8,8 +8,6 @@ class Settings(BaseSettings):
extra="ignore",
)
OPENMP_KMP_DUPLICATE_LIB_OK: bool = False
# CORS
CORS_ORIGIN: str = "*"
CORS_ALLOW_CREDENTIALS: bool = False
@@ -20,26 +18,6 @@ class Settings(BaseSettings):
# local data directory (audio for no)
DATA_DIR: str = "./data"
# Whisper
WHISPER_MODEL_SIZE: str = "tiny"
WHISPER_REAL_TIME_MODEL_SIZE: str = "tiny"
# Summarizer
SUMMARIZER_MODEL: str = "facebook/bart-large-cnn"
SUMMARIZER_INPUT_ENCODING_MAX_LENGTH: int = 1024
SUMMARIZER_MAX_LENGTH: int = 2048
SUMMARIZER_BEAM_SIZE: int = 6
SUMMARIZER_MAX_CHUNK_LENGTH: int = 1024
SUMMARIZER_USING_CHUNKS: bool = True
# Audio
AUDIO_BLACKHOLE_INPUT_AGGREGATOR_DEVICE_NAME: str = "aggregator"
AUDIO_AV_FOUNDATION_DEVICE_ID: int = 1
AUDIO_CHANNELS: int = 2
AUDIO_SAMPLING_RATE: int = 48000
AUDIO_SAMPLING_WIDTH: int = 2
AUDIO_BUFFER_SIZE: int = 256 * 960
# Audio Transcription
# backends: whisper, modal
TRANSCRIPT_BACKEND: str = "whisper"
@@ -63,8 +41,8 @@ class Settings(BaseSettings):
TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY: str | None = None
# LLM
# available backend: openai, modal, oobabooga
LLM_BACKEND: str = "oobabooga"
# available backend: openai, modal
LLM_BACKEND: str = "modal"
# LLM common configuration
LLM_URL: str | None = None
@@ -82,6 +60,12 @@ class Settings(BaseSettings):
# LLM Modal configuration
LLM_MODAL_API_KEY: str | None = None
# per-task cases
SUMMARY_MODEL: str = "monadical/private/smart"
SUMMARY_LLM_URL: str | None = None
SUMMARY_LLM_API_KEY: str | None = None
SUMMARY_LLM_CONTEXT_SIZE_TOKENS: int = 16000
# Diarization
DIARIZATION_ENABLED: bool = True
DIARIZATION_BACKEND: str = "modal"

View File

@@ -1,6 +1,7 @@
import importlib
from pydantic import BaseModel
from reflector.settings import settings

View File

@@ -1,4 +1,5 @@
import aioboto3
from reflector.logger import logger
from reflector.storage.base import FileResult, Storage

View File

@@ -1,6 +1,7 @@
import asyncio
import time
import uuid
from os import environ
import httpx
import stamina
@@ -8,7 +9,6 @@ from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer, MediaRelay
from reflector.logger import logger
from reflector.settings import settings
class StreamClient:
@@ -43,8 +43,9 @@ class StreamClient:
else:
if self.relay is None:
self.relay = MediaRelay()
audio_device_id = int(environ.get("AUDIO_AV_FOUNDATION_DEVICE_ID", 1))
self.player = MediaPlayer(
f":{settings.AUDIO_AV_FOUNDATION_DEVICE_ID}",
f":{audio_device_id}",
format="avfoundation",
options={"channels": "2"},
)
@@ -126,7 +127,7 @@ class StreamClient:
answer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
await pc.setRemoteDescription(answer)
self.reader = self.worker(f'{"worker"}', self.queue)
self.reader = self.worker(f"{'worker'}", self.queue)
def get_reader(self):
return self.reader

View File

@@ -36,9 +36,13 @@ async def export_db(filename: str) -> None:
if entry["event"] == "TRANSCRIPT":
yield tid, "event_transcript", idx, "text", entry["data"]["text"]
if entry["data"].get("translation") is not None:
yield tid, "event_transcript", idx, "translation", entry[
"data"
].get("translation", None)
yield (
tid,
"event_transcript",
idx,
"translation",
entry["data"].get("translation", None),
)
def export_transcripts(transcripts):
for transcript in transcripts:

View File

@@ -1,6 +1,7 @@
import asyncio
import av
from reflector.logger import logger
from reflector.processors import (
AudioChunkerProcessor,

View File

@@ -0,0 +1,316 @@
"""
@vibe-generated
Process audio file with diarization support
===========================================
Extended version of process.py that includes speaker diarization.
This tool processes audio files locally without requiring the full server infrastructure.
"""
import asyncio
import tempfile
import uuid
from pathlib import Path
from typing import List
import av
from reflector.logger import logger
from reflector.processors import (
AudioChunkerProcessor,
AudioFileWriterProcessor,
AudioMergeProcessor,
AudioTranscriptAutoProcessor,
Pipeline,
PipelineEvent,
TranscriptFinalSummaryProcessor,
TranscriptFinalTitleProcessor,
TranscriptLinerProcessor,
TranscriptTopicDetectorProcessor,
TranscriptTranslatorProcessor,
)
from reflector.processors.base import BroadcastProcessor, Processor
from reflector.processors.types import (
AudioDiarizationInput,
TitleSummary,
TitleSummaryWithId,
)
class TopicCollectorProcessor(Processor):
"""Collect topics for diarization"""
INPUT_TYPE = TitleSummary
OUTPUT_TYPE = TitleSummary
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.topics: List[TitleSummaryWithId] = []
self._topic_id = 0
async def _push(self, data: TitleSummary):
# Convert to TitleSummaryWithId and collect
self._topic_id += 1
topic_with_id = TitleSummaryWithId(
id=str(self._topic_id),
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
duration=data.duration,
transcript=data.transcript,
)
self.topics.append(topic_with_id)
# Pass through the original topic
await self.emit(data)
def get_topics(self) -> List[TitleSummaryWithId]:
return self.topics
async def process_audio_file_with_diarization(
filename,
event_callback,
only_transcript=False,
source_language="en",
target_language="en",
enable_diarization=True,
diarization_backend="modal",
):
# Create temp file for audio if diarization is enabled
audio_temp_path = None
if enable_diarization:
audio_temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
audio_temp_path = audio_temp_file.name
audio_temp_file.close()
# Create processor for collecting topics
topic_collector = TopicCollectorProcessor()
# Build pipeline for audio processing
processors = []
# Add audio file writer at the beginning if diarization is enabled
if enable_diarization:
processors.append(AudioFileWriterProcessor(audio_temp_path))
# Add the rest of the processors
processors += [
AudioChunkerProcessor(),
AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(),
]
processors += [
TranscriptLinerProcessor(),
TranscriptTranslatorProcessor.as_threaded(),
]
if not only_transcript:
processors += [
TranscriptTopicDetectorProcessor.as_threaded(),
# Collect topics for diarization
topic_collector,
BroadcastProcessor(
processors=[
TranscriptFinalTitleProcessor.as_threaded(),
TranscriptFinalSummaryProcessor.as_threaded(),
],
),
]
# Create main pipeline
pipeline = Pipeline(*processors)
pipeline.set_pref("audio:source_language", source_language)
pipeline.set_pref("audio:target_language", target_language)
pipeline.describe()
pipeline.on(event_callback)
# Start processing audio
logger.info(f"Opening {filename}")
container = av.open(filename)
try:
logger.info("Start pushing audio into the pipeline")
for frame in container.decode(audio=0):
await pipeline.push(frame)
finally:
logger.info("Flushing the pipeline")
await pipeline.flush()
# Run diarization if enabled and we have topics
if enable_diarization and not only_transcript and audio_temp_path:
topics = topic_collector.get_topics()
if topics:
logger.info(f"Starting diarization with {len(topics)} topics")
try:
# Import diarization processor
from reflector.processors import AudioDiarizationAutoProcessor
# Create diarization processor
diarization_processor = AudioDiarizationAutoProcessor(
name=diarization_backend
)
diarization_processor.on(event_callback)
# For Modal backend, we need to upload the file to S3 first
if diarization_backend == "modal":
from datetime import datetime
from reflector.storage import get_transcripts_storage
from reflector.utils.s3_temp_file import S3TemporaryFile
storage = get_transcripts_storage()
# Generate a unique filename in evaluation folder
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
audio_filename = f"evaluation/diarization_temp/{timestamp}_{uuid.uuid4().hex}.wav"
# Use context manager for automatic cleanup
async with S3TemporaryFile(storage, audio_filename) as s3_file:
# Read and upload the audio file
with open(audio_temp_path, "rb") as f:
audio_data = f.read()
audio_url = await s3_file.upload(audio_data)
logger.info(f"Uploaded audio to S3: {audio_filename}")
# Create diarization input with S3 URL
diarization_input = AudioDiarizationInput(
audio_url=audio_url, topics=topics
)
# Run diarization
await diarization_processor.push(diarization_input)
await diarization_processor.flush()
logger.info("Diarization complete")
# File will be automatically cleaned up when exiting the context
else:
# For local backend, use local file path
audio_url = audio_temp_path
# Create diarization input
diarization_input = AudioDiarizationInput(
audio_url=audio_url, topics=topics
)
# Run diarization
await diarization_processor.push(diarization_input)
await diarization_processor.flush()
logger.info("Diarization complete")
except ImportError as e:
logger.error(f"Failed to import diarization dependencies: {e}")
logger.error(
"Install with: uv pip install pyannote.audio torch torchaudio"
)
logger.error(
"And set HF_TOKEN environment variable for pyannote models"
)
raise SystemExit(1)
except Exception as e:
logger.error(f"Diarization failed: {e}")
raise SystemExit(1)
else:
logger.warning("Skipping diarization: no topics available")
# Clean up temp file
if audio_temp_path:
try:
Path(audio_temp_path).unlink()
except Exception as e:
logger.warning(f"Failed to clean up temp file {audio_temp_path}: {e}")
logger.info("All done!")
if __name__ == "__main__":
import argparse
import os
parser = argparse.ArgumentParser(
description="Process audio files with optional speaker diarization"
)
parser.add_argument("source", help="Source file (mp3, wav, mp4...)")
parser.add_argument(
"--only-transcript",
"-t",
action="store_true",
help="Only generate transcript without topics/summaries",
)
parser.add_argument(
"--source-language", default="en", help="Source language code (default: en)"
)
parser.add_argument(
"--target-language", default="en", help="Target language code (default: en)"
)
parser.add_argument("--output", "-o", help="Output file (output.jsonl)")
parser.add_argument(
"--enable-diarization",
"-d",
action="store_true",
help="Enable speaker diarization",
)
parser.add_argument(
"--diarization-backend",
default="modal",
choices=["modal"],
help="Diarization backend to use (default: modal)",
)
args = parser.parse_args()
# Set REDIS_HOST to localhost if not provided
if "REDIS_HOST" not in os.environ:
os.environ["REDIS_HOST"] = "localhost"
logger.info("REDIS_HOST not set, defaulting to localhost")
output_fd = None
if args.output:
output_fd = open(args.output, "w")
async def event_callback(event: PipelineEvent):
processor = event.processor
data = event.data
# Ignore internal processors
if processor in (
"AudioChunkerProcessor",
"AudioMergeProcessor",
"AudioFileWriterProcessor",
"TopicCollectorProcessor",
"BroadcastProcessor",
):
return
# If diarization is enabled, skip the original topic events from the pipeline
# The diarization processor will emit the same topics but with speaker info
if processor == "TranscriptTopicDetectorProcessor" and args.enable_diarization:
return
# Log all events
logger.info(f"Event: {processor} - {type(data).__name__}")
# Write to output
if output_fd:
output_fd.write(event.model_dump_json())
output_fd.write("\n")
output_fd.flush()
asyncio.run(
process_audio_file_with_diarization(
args.source,
event_callback,
only_transcript=args.only_transcript,
source_language=args.source_language,
target_language=args.target_language,
enable_diarization=args.enable_diarization,
diarization_backend=args.diarization_backend,
)
)
if output_fd:
output_fd.close()
logger.info(f"Output written to {args.output}")

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
@vibe-generated
Test script for the diarization CLI tool
=========================================
This script helps test the diarization functionality with sample audio files.
"""
import asyncio
import sys
from pathlib import Path
from reflector.logger import logger
async def test_diarization(audio_file: str):
"""Test the diarization functionality"""
# Import the processing function
from process_with_diarization import process_audio_file_with_diarization
# Collect events
events = []
async def event_callback(event):
events.append({"processor": event.processor, "data": event.data})
logger.info(f"Event from {event.processor}")
# Process the audio file
logger.info(f"Processing audio file: {audio_file}")
try:
await process_audio_file_with_diarization(
audio_file,
event_callback,
only_transcript=False,
source_language="en",
target_language="en",
enable_diarization=True,
diarization_backend="modal",
)
# Analyze results
logger.info(f"Processing complete. Received {len(events)} events")
# Look for diarization results
diarized_topics = []
for event in events:
if "TitleSummary" in event["processor"]:
# Check if words have speaker information
if hasattr(event["data"], "transcript") and event["data"].transcript:
words = event["data"].transcript.words
if words and hasattr(words[0], "speaker"):
speakers = set(
w.speaker for w in words if hasattr(w, "speaker")
)
logger.info(
f"Found {len(speakers)} speakers in topic: {event['data'].title}"
)
diarized_topics.append(event["data"])
if diarized_topics:
logger.info(f"Successfully diarized {len(diarized_topics)} topics")
# Print sample output
sample_topic = diarized_topics[0]
logger.info("Sample diarized output:")
for i, word in enumerate(sample_topic.transcript.words[:10]):
logger.info(f" Word {i}: '{word.text}' - Speaker {word.speaker}")
else:
logger.warning("No diarization results found in output")
return events
except Exception as e:
logger.error(f"Error during processing: {e}")
raise
def main():
if len(sys.argv) < 2:
print("Usage: python test_diarization.py <audio_file>")
sys.exit(1)
audio_file = sys.argv[1]
if not Path(audio_file).exists():
print(f"Error: Audio file '{audio_file}' not found")
sys.exit(1)
# Run the test
asyncio.run(test_diarization(audio_file))
if __name__ == "__main__":
main()

View File

@@ -1,59 +0,0 @@
"""
Utility file for file handling related functions, including file downloads and
uploads to cloud storage
"""
import sys
from typing import List, NoReturn
import boto3
import botocore
from .log_utils import LOGGER
from .run_utils import SECRETS
BUCKET_NAME = SECRETS["AWS-S3"]["BUCKET_NAME"]
s3 = boto3.client(
"s3",
aws_access_key_id=SECRETS["AWS-S3"]["AWS_ACCESS_KEY"],
aws_secret_access_key=SECRETS["AWS-S3"]["AWS_SECRET_KEY"],
)
def upload_files(files_to_upload: List[str]) -> NoReturn:
"""
Upload a list of files to the configured S3 bucket
:param files_to_upload: List of files to upload
:return: None
"""
for key in files_to_upload:
LOGGER.info("Uploading file " + key)
try:
s3.upload_file(key, BUCKET_NAME, key)
except botocore.exceptions.ClientError as exception:
print(exception.response)
def download_files(files_to_download: List[str]) -> NoReturn:
"""
Download a list of files from the configured S3 bucket
:param files_to_download: List of files to download
:return: None
"""
for key in files_to_download:
LOGGER.info("Downloading file " + key)
try:
s3.download_file(BUCKET_NAME, key, key)
except botocore.exceptions.ClientError as exception:
if exception.response["Error"]["Code"] == "404":
print("The object does not exist.")
else:
raise
if __name__ == "__main__":
if sys.argv[1] == "download":
download_files([sys.argv[2]])
elif sys.argv[1] == "upload":
upload_files([sys.argv[2]])

View File

@@ -1,38 +0,0 @@
"""
Utility function to format the artefacts created during Reflector run
"""
import json
with open("../artefacts/meeting_titles_and_summaries.txt", "r", encoding="utf-8") as f:
outputs = f.read()
outputs = json.loads(outputs)
transcript_file = open("../artefacts/meeting_transcript.txt", "a", encoding="utf-8")
title_desc_file = open(
"../artefacts/meeting_title_description.txt", "a", encoding="utf-8"
)
summary_file = open("../artefacts/meeting_summary.txt", "a", encoding="utf-8")
for item in outputs["topics"]:
transcript_file.write(item["transcript"])
summary_file.write(item["description"])
title_desc_file.write("TITLE: \n")
title_desc_file.write(item["title"])
title_desc_file.write("\n")
title_desc_file.write("DESCRIPTION: \n")
title_desc_file.write(item["description"])
title_desc_file.write("\n")
title_desc_file.write("TRANSCRIPT: \n")
title_desc_file.write(item["transcript"])
title_desc_file.write("\n")
title_desc_file.write("---------------------------------------- \n\n")
transcript_file.close()
title_desc_file.close()
summary_file.close()

View File

@@ -1,8 +1,10 @@
from reflector.logger import logger
from time import monotonic
from httpx import HTTPStatusError, Response
from random import random
import asyncio
from random import random
from time import monotonic
from httpx import HTTPStatusError, Response
from reflector.logger import logger
class RetryException(Exception):
@@ -34,6 +36,7 @@ def retry(fn):
),
)
retry_ignore_exc_types = kwargs.pop("retry_ignore_exc_types", (Exception,))
retry_logger = kwargs.pop("logger", logger)
result = None
last_exception = None
@@ -58,17 +61,33 @@ def retry(fn):
if result:
return result
except HTTPStatusError as e:
logger.exception(e)
retry_logger.exception(e)
status_code = e.response.status_code
logger.debug(f"HTTP status {status_code} - {e}")
# Log detailed error information including response body
try:
response_text = e.response.text
response_headers = dict(e.response.headers)
retry_logger.error(
f"HTTP {status_code} error for {e.request.method} {e.request.url}\n"
f"Response headers: {response_headers}\n"
f"Response body: {response_text}"
)
except Exception as log_error:
retry_logger.warning(
f"Failed to log detailed error info: {log_error}"
)
retry_logger.debug(f"HTTP status {status_code} - {e}")
if status_code in retry_httpx_status_stop:
message = f"HTTP status {status_code} is in retry_httpx_status_stop"
raise RetryHTTPException(message) from e
except retry_ignore_exc_types as e:
logger.exception(e)
retry_logger.exception(e)
last_exception = e
logger.debug(
retry_logger.debug(
f"Retrying {fn_name} - in {retry_backoff_interval:.1f}s "
f"({monotonic() - start:.1f}s / {retry_timeout:.1f}s)"
)

View File

@@ -1,55 +0,0 @@
"""
Utility file for server side asynchronous task running and config objects
"""
import asyncio
import contextlib
from functools import partial
from threading import Lock
from typing import ContextManager, Generic, TypeVar
def run_in_executor(func, *args, executor=None, **kwargs):
"""
Run the function in an executor, unblocking the main loop
:param func: Function to be run in executor
:param args: function parameters
:param executor: executor instance [Thread | Process]
:param kwargs: Additional parameters
:return: Future of function result upon completion
"""
callback = partial(func, *args, **kwargs)
loop = asyncio.get_event_loop()
return loop.run_in_executor(executor, callback)
# Genetic type template
T = TypeVar("T")
class Mutex(Generic[T]):
"""
Mutex class to implement lock/release of a shared
protected variable
"""
def __init__(self, value: T):
"""
Create an instance of Mutex wrapper for the given resource
:param value: Shared resources to be thread protected
"""
self.__value = value
self.__lock = Lock()
@contextlib.contextmanager
def lock(self) -> ContextManager[T]:
"""
Lock the resource with a mutex to be used within a context block
The lock is automatically released on context exit
:return: Shared resource
"""
self.__lock.acquire()
try:
yield self.__value
finally:
self.__lock.release()

View File

@@ -0,0 +1,150 @@
"""
@vibe-generated
S3 Temporary File Context Manager
Provides automatic cleanup of S3 files with retry logic and proper error handling.
"""
from typing import Optional
from reflector.logger import logger
from reflector.storage.base import Storage
from reflector.utils.retry import retry
class S3TemporaryFile:
"""
Async context manager for temporary S3 files with automatic cleanup.
Ensures that uploaded files are deleted even if exceptions occur during processing.
Uses retry logic for all S3 operations to handle transient failures.
Example:
async with S3TemporaryFile(storage, "temp/audio.wav") as s3_file:
url = await s3_file.upload(audio_data)
# Use url for processing
# File is automatically cleaned up here
"""
def __init__(self, storage: Storage, filepath: str):
"""
Initialize the temporary file context.
Args:
storage: Storage instance for S3 operations
filepath: S3 key/path for the temporary file
"""
self.storage = storage
self.filepath = filepath
self.uploaded = False
self._url: Optional[str] = None
async def __aenter__(self):
"""Enter the context manager."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Exit the context manager and clean up the file.
Cleanup is attempted even if an exception occurred during processing.
Cleanup failures are logged but don't raise exceptions.
"""
if self.uploaded:
try:
await self._delete_with_retry()
logger.info(f"Successfully cleaned up S3 file: {self.filepath}")
except Exception as e:
# Log the error but don't raise - we don't want cleanup failures
# to mask the original exception
logger.warning(
f"Failed to cleanup S3 file {self.filepath} after retries: {e}"
)
return False # Don't suppress exceptions
async def upload(self, data: bytes) -> str:
"""
Upload data to S3 and return the public URL.
Args:
data: File data to upload
Returns:
Public URL for the uploaded file
Raises:
Exception: If upload or URL generation fails after retries
"""
await self._upload_with_retry(data)
self.uploaded = True
self._url = await self._get_url_with_retry()
return self._url
@property
def url(self) -> Optional[str]:
"""Get the URL of the uploaded file, if available."""
return self._url
async def _upload_with_retry(self, data: bytes):
"""Upload file to S3 with retry logic."""
async def upload():
await self.storage.put_file(self.filepath, data)
logger.debug(f"Successfully uploaded file to S3: {self.filepath}")
return True # Return something to indicate success
await retry(upload)(
retry_attempts=3,
retry_timeout=30.0,
retry_backoff_interval=0.5,
retry_backoff_max=5.0,
)
async def _get_url_with_retry(self) -> str:
"""Get public URL for the file with retry logic."""
async def get_url():
url = await self.storage.get_file_url(self.filepath)
logger.debug(f"Generated public URL for S3 file: {self.filepath}")
return url
return await retry(get_url)(
retry_attempts=3,
retry_timeout=30.0,
retry_backoff_interval=0.5,
retry_backoff_max=5.0,
)
async def _delete_with_retry(self):
"""Delete file from S3 with retry logic."""
async def delete():
await self.storage.delete_file(self.filepath)
logger.debug(f"Successfully deleted S3 file: {self.filepath}")
return True # Return something to indicate success
await retry(delete)(
retry_attempts=3,
retry_timeout=30.0,
retry_backoff_interval=0.5,
retry_backoff_max=5.0,
)
# Convenience function for simpler usage
async def temporary_s3_file(storage: Storage, filepath: str):
"""
Create a temporary S3 file context manager.
This is a convenience wrapper around S3TemporaryFile for simpler usage.
Args:
storage: Storage instance for S3 operations
filepath: S3 key/path for the temporary file
Example:
async with temporary_s3_file(storage, "temp/audio.wav") as s3_file:
url = await s3_file.upload(audio_data)
# Use url for processing
"""
return S3TemporaryFile(storage, filepath)

View File

@@ -1,264 +0,0 @@
"""
Utility file for all text processing related functionalities
"""
import datetime
from typing import List
import nltk
import torch
from log_utils import LOGGER
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from run_utils import CONFIG
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import BartForConditionalGeneration, BartTokenizer
nltk.download("punkt", quiet=True)
def preprocess_sentence(sentence: str) -> str:
"""
Filter out undesirable tokens from thr sentence
:param sentence:
:return:
"""
stop_words = set(stopwords.words("english"))
tokens = word_tokenize(sentence.lower())
tokens = [token for token in tokens if token.isalnum() and token not in stop_words]
return " ".join(tokens)
def compute_similarity(sent1: str, sent2: str) -> float:
"""
Compute the similarity
"""
tfidf_vectorizer = TfidfVectorizer()
if sent1 is not None and sent2 is not None:
tfidf_matrix = tfidf_vectorizer.fit_transform([sent1, sent2])
return cosine_similarity(tfidf_matrix[0], tfidf_matrix[1])[0][0]
return 0.0
def remove_almost_alike_sentences(sentences: List[str], threshold=0.7) -> List[str]:
"""
Filter sentences that are similar beyond a set threshold
:param sentences:
:param threshold:
:return:
"""
num_sentences = len(sentences)
removed_indices = set()
for i in range(num_sentences):
if i not in removed_indices:
for j in range(i + 1, num_sentences):
if j not in removed_indices:
l_i = len(sentences[i])
l_j = len(sentences[j])
if l_i == 0 or l_j == 0:
if l_i == 0:
removed_indices.add(i)
if l_j == 0:
removed_indices.add(j)
else:
sentence1 = preprocess_sentence(sentences[i])
sentence2 = preprocess_sentence(sentences[j])
if len(sentence1) != 0 and len(sentence2) != 0:
similarity = compute_similarity(sentence1, sentence2)
if similarity >= threshold:
removed_indices.add(max(i, j))
filtered_sentences = [
sentences[i] for i in range(num_sentences) if i not in removed_indices
]
return filtered_sentences
def remove_outright_duplicate_sentences_from_chunk(chunk: str) -> List[str]:
"""
Remove repetitive sentences
:param chunk:
:return:
"""
chunk_text = chunk["text"]
sentences = nltk.sent_tokenize(chunk_text)
nonduplicate_sentences = list(dict.fromkeys(sentences))
return nonduplicate_sentences
def remove_whisper_repetitive_hallucination(
nonduplicate_sentences: List[str],
) -> List[str]:
"""
Remove sentences that are repeated as a result of Whisper
hallucinations
:param nonduplicate_sentences:
:return:
"""
chunk_sentences = []
for sent in nonduplicate_sentences:
temp_result = ""
seen = {}
words = nltk.word_tokenize(sent)
n_gram_filter = 3
for i in range(len(words)):
if (
str(words[i : i + n_gram_filter]) in seen
and seen[str(words[i : i + n_gram_filter])]
== words[i + 1 : i + n_gram_filter + 2]
):
pass
else:
seen[str(words[i : i + n_gram_filter])] = words[
i + 1 : i + n_gram_filter + 2
]
temp_result += words[i]
temp_result += " "
chunk_sentences.append(temp_result)
return chunk_sentences
def post_process_transcription(whisper_result: dict) -> dict:
"""
Parent function to perform post-processing on the transcription result
:param whisper_result:
:return:
"""
transcript_text = ""
for chunk in whisper_result["chunks"]:
nonduplicate_sentences = remove_outright_duplicate_sentences_from_chunk(chunk)
chunk_sentences = remove_whisper_repetitive_hallucination(
nonduplicate_sentences
)
similarity_matched_sentences = remove_almost_alike_sentences(chunk_sentences)
chunk["text"] = " ".join(similarity_matched_sentences)
transcript_text += chunk["text"]
whisper_result["text"] = transcript_text
return whisper_result
def summarize_chunks(chunks: List[str], tokenizer, model) -> List[str]:
"""
Summarize each chunk using a summarizer model
:param chunks:
:param tokenizer:
:param model:
:return:
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
summaries = []
for c in chunks:
input_ids = tokenizer.encode(c, return_tensors="pt")
input_ids = input_ids.to(device)
with torch.no_grad():
summary_ids = model.generate(
input_ids,
num_beams=int(CONFIG["SUMMARIZER"]["BEAM_SIZE"]),
length_penalty=2.0,
max_length=int(CONFIG["SUMMARIZER"]["MAX_LENGTH"]),
early_stopping=True,
)
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
summaries.append(summary)
return summaries
def chunk_text(
text: str, max_chunk_length: int = int(CONFIG["SUMMARIZER"]["MAX_CHUNK_LENGTH"])
) -> List[str]:
"""
Split text into smaller chunks.
:param text: Text to be chunked
:param max_chunk_length: length of chunk
:return: chunked texts
"""
sentences = nltk.sent_tokenize(text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < max_chunk_length:
current_chunk += f" {sentence.strip()}"
else:
chunks.append(current_chunk.strip())
current_chunk = f"{sentence.strip()}"
chunks.append(current_chunk.strip())
return chunks
def summarize(
transcript_text: str,
timestamp: datetime.datetime.timestamp,
real_time: bool = False,
chunk_summarize: str = CONFIG["SUMMARIZER"]["SUMMARIZE_USING_CHUNKS"],
):
"""
Summarize the given text either as a whole or as chunks as needed
:param transcript_text:
:param timestamp:
:param real_time:
:param chunk_summarize:
:return:
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
summary_model = CONFIG["SUMMARIZER"]["SUMMARY_MODEL"]
if not summary_model:
summary_model = "facebook/bart-large-cnn"
# Summarize the generated transcript using the BART model
LOGGER.info(f"Loading BART model: {summary_model}")
tokenizer = BartTokenizer.from_pretrained(summary_model)
model = BartForConditionalGeneration.from_pretrained(summary_model)
model = model.to(device)
output_file = "summary_" + timestamp.strftime("%m-%d-%Y_%H:%M:%S") + ".txt"
if real_time:
output_file = "real_time_" + output_file
if chunk_summarize != "YES":
max_length = int(CONFIG["SUMMARIZER"]["INPUT_ENCODING_MAX_LENGTH"])
inputs = tokenizer.batch_encode_plus(
[transcript_text],
truncation=True,
padding="longest",
max_length=max_length,
return_tensors="pt",
)
inputs = inputs.to(device)
with torch.no_grad():
num_beans = int(CONFIG["SUMMARIZER"]["BEAM_SIZE"])
max_length = int(CONFIG["SUMMARIZER"]["MAX_LENGTH"])
summaries = model.generate(
inputs["input_ids"],
num_beams=num_beans,
length_penalty=2.0,
max_length=max_length,
early_stopping=True,
)
decoded_summaries = [
tokenizer.decode(
summary, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
for summary in summaries
]
summary = " ".join(decoded_summaries)
with open("./artefacts/" + output_file, "w", encoding="utf-8") as file:
file.write(summary.strip() + "\n")
else:
LOGGER.info("Breaking transcript into smaller chunks")
chunks = chunk_text(transcript_text)
LOGGER.info(
f"Transcript broken into {len(chunks)} " f"chunks of at most 500 words"
)
LOGGER.info(f"Writing summary text to: {output_file}")
with open(output_file, "w") as f:
summaries = summarize_chunks(chunks, tokenizer, model)
for summary in summaries:
f.write(summary.strip() + " ")

View File

@@ -1,283 +0,0 @@
"""
Utility file for all visualization related functions
"""
import ast
import collections
import datetime
import os
import pickle
from typing import NoReturn
import matplotlib.pyplot as plt
import pandas as pd
import scattertext as st
import spacy
from nltk.corpus import stopwords
from wordcloud import STOPWORDS, WordCloud
en = spacy.load("en_core_web_md")
spacy_stopwords = en.Defaults.stop_words
STOPWORDS = (
set(STOPWORDS).union(set(stopwords.words("english"))).union(set(spacy_stopwords))
)
def create_wordcloud(
timestamp: datetime.datetime.timestamp, real_time: bool = False
) -> NoReturn:
"""
Create a basic word cloud visualization of transcribed text
:return: None. The wordcloud image is saved locally
"""
filename = "transcript"
if real_time:
filename = (
"real_time_"
+ filename
+ "_"
+ timestamp.strftime("%m-%d-%Y_%H:%M:%S")
+ ".txt"
)
else:
filename += "_" + timestamp.strftime("%m-%d-%Y_%H:%M:%S") + ".txt"
with open("./artefacts/" + filename, "r") as f:
transcription_text = f.read()
# python_mask = np.array(PIL.Image.open("download1.png"))
wordcloud = WordCloud(
height=800,
width=800,
background_color="white",
stopwords=STOPWORDS,
min_font_size=8,
).generate(transcription_text)
# Plot wordcloud and save image
plt.figure(facecolor=None)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)
wordcloud = "wordcloud"
if real_time:
wordcloud = (
"real_time_"
+ wordcloud
+ "_"
+ timestamp.strftime("%m-%d-%Y_%H:%M:%S")
+ ".png"
)
else:
wordcloud += "_" + timestamp.strftime("%m-%d-%Y_%H:%M:%S") + ".png"
plt.savefig("./artefacts/" + wordcloud)
def create_talk_diff_scatter_viz(
timestamp: datetime.datetime.timestamp, real_time: bool = False
) -> NoReturn:
"""
Perform agenda vs transcription diff to see covered topics.
Create a scatter plot of words in topics.
:return: None. Saved locally.
"""
spacy_model = "en_core_web_md"
nlp = spacy.load(spacy_model)
nlp.add_pipe("sentencizer")
agenda_topics = []
agenda = []
# Load the agenda
with open(os.path.join(os.getcwd(), "agenda-headers.txt"), "r") as f:
for line in f.readlines():
if line.strip():
agenda.append(line.strip())
agenda_topics.append(line.split(":")[0])
# Load the transcription with timestamp
if real_time:
filename = (
"./artefacts/real_time_transcript_with_timestamp_"
+ timestamp.strftime("%m-%d-%Y_%H:%M:%S")
+ ".txt"
)
else:
filename = (
"./artefacts/transcript_with_timestamp_"
+ timestamp.strftime("%m-%d-%Y_%H:%M:%S")
+ ".txt"
)
with open(filename) as file:
transcription_timestamp_text = file.read()
res = ast.literal_eval(transcription_timestamp_text)
chunks = res["chunks"]
# create df for processing
df = pd.DataFrame.from_dict(res["chunks"])
covered_items = {}
# ts: timestamp
# Map each timestamped chunk with top1 and top2 matched agenda
ts_to_topic_mapping_top_1 = {}
ts_to_topic_mapping_top_2 = {}
# Also create a mapping of the different timestamps
# in which each topic was covered
topic_to_ts_mapping_top_1 = collections.defaultdict(list)
topic_to_ts_mapping_top_2 = collections.defaultdict(list)
similarity_threshold = 0.7
for c in chunks:
doc_transcription = nlp(c["text"])
topic_similarities = []
for item in range(len(agenda)):
item_doc = nlp(agenda[item])
# if not doc_transcription or not all
# (token.has_vector for token in doc_transcription):
if not doc_transcription:
continue
similarity = doc_transcription.similarity(item_doc)
topic_similarities.append((item, similarity))
topic_similarities.sort(key=lambda x: x[1], reverse=True)
for i in range(2):
if topic_similarities[i][1] >= similarity_threshold:
covered_items[agenda[topic_similarities[i][0]]] = True
# top1 match
if i == 0:
ts_to_topic_mapping_top_1[c["timestamp"]] = agenda_topics[
topic_similarities[i][0]
]
topic_to_ts_mapping_top_1[
agenda_topics[topic_similarities[i][0]]
].append(c["timestamp"])
# top2 match
else:
ts_to_topic_mapping_top_2[c["timestamp"]] = agenda_topics[
topic_similarities[i][0]
]
topic_to_ts_mapping_top_2[
agenda_topics[topic_similarities[i][0]]
].append(c["timestamp"])
def create_new_columns(record: dict) -> dict:
"""
Accumulate the mapping information into the df
:param record:
:return:
"""
record["ts_to_topic_mapping_top_1"] = ts_to_topic_mapping_top_1[
record["timestamp"]
]
record["ts_to_topic_mapping_top_2"] = ts_to_topic_mapping_top_2[
record["timestamp"]
]
return record
df = df.apply(create_new_columns, axis=1)
# Count the number of items covered and calculate the percentage
num_covered_items = sum(covered_items.values())
percentage_covered = num_covered_items / len(agenda) * 100
# Print the results
print("💬 Agenda items covered in the transcription:")
for item in agenda:
if item in covered_items and covered_items[item]:
print("", item)
else:
print("", item)
print("📊 Coverage: {:.2f}%".format(percentage_covered))
# Save df, mappings for further experimentation
df_name = "df"
if real_time:
df_name = (
"real_time_"
+ df_name
+ "_"
+ timestamp.strftime("%m-%d-%Y_%H:%M:%S")
+ ".pkl"
)
else:
df_name += "_" + timestamp.strftime("%m-%d-%Y_%H:%M:%S") + ".pkl"
df.to_pickle("./artefacts/" + df_name)
my_mappings = [
ts_to_topic_mapping_top_1,
ts_to_topic_mapping_top_2,
topic_to_ts_mapping_top_1,
topic_to_ts_mapping_top_2,
]
mappings_name = "mappings"
if real_time:
mappings_name = (
"real_time_"
+ mappings_name
+ "_"
+ timestamp.strftime("%m-%d-%Y_%H:%M:%S")
+ ".pkl"
)
else:
mappings_name += "_" + timestamp.strftime("%m-%d-%Y_%H:%M:%S") + ".pkl"
pickle.dump(my_mappings, open("./artefacts/" + mappings_name, "wb"))
# to load, my_mappings = pickle.load( open ("mappings.pkl", "rb") )
# pick the 2 most matched topic to be used for plotting
topic_times = collections.defaultdict(int)
for key in ts_to_topic_mapping_top_1.keys():
if key[0] is None or key[1] is None:
continue
duration = key[1] - key[0]
topic_times[ts_to_topic_mapping_top_1[key]] += duration
topic_times = sorted(topic_times.items(), key=lambda x: x[1], reverse=True)
if len(topic_times) > 1:
cat_1 = topic_times[0][0]
cat_1_name = topic_times[0][0]
cat_2_name = topic_times[1][0]
# Scatter plot of topics
df = df.assign(parse=lambda df: df.text.apply(st.whitespace_nlp_with_sentences))
corpus = (
st.CorpusFromParsedDocuments(
df, category_col="ts_to_topic_mapping_top_1", parsed_col="parse"
)
.build()
.get_unigram_corpus()
.compact(st.AssociationCompactor(2000))
)
html = st.produce_scattertext_explorer(
corpus,
category=cat_1,
category_name=cat_1_name,
not_category_name=cat_2_name,
minimum_term_frequency=0,
pmi_threshold_coefficient=0,
width_in_pixels=1000,
transform=st.Scalers.dense_rank,
)
if real_time:
with open(
"./artefacts/real_time_scatter_"
+ timestamp.strftime("%m-%d-%Y_%H:%M:%S")
+ ".html",
"w",
) as file:
file.write(html)
else:
with open(
"./artefacts/scatter_"
+ timestamp.strftime("%m-%d-%Y_%H:%M:%S")
+ ".html",
"w",
) as file:
file.write(html)

View File

@@ -1,10 +1,10 @@
from datetime import datetime
from typing import Annotated, Optional
import reflector.auth as auth
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
import reflector.auth as auth
from reflector.db.meetings import (
MeetingConsent,
meeting_consent_controller,

View File

@@ -1,17 +1,23 @@
import logging
import sqlite3
from datetime import datetime, timedelta
from typing import Annotated, Optional, Literal
from typing import Annotated, Literal, Optional
import reflector.auth as auth
import asyncpg.exceptions
from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Page
from fastapi_pagination.ext.databases import paginate
from pydantic import BaseModel
import reflector.auth as auth
from reflector.db import database
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.settings import settings
from reflector.whereby import create_meeting, upload_logo
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -149,19 +155,47 @@ async def rooms_create_meeting(
if meeting is None:
end_date = current_time + timedelta(hours=8)
meeting = await create_meeting("", end_date=end_date, room=room)
await upload_logo(meeting["roomName"], "./images/logo.png")
meeting = await meetings_controller.create(
id=meeting["meetingId"],
room_name=meeting["roomName"],
room_url=meeting["roomUrl"],
host_room_url=meeting["hostRoomUrl"],
start_date=datetime.fromisoformat(meeting["startDate"]),
end_date=datetime.fromisoformat(meeting["endDate"]),
user_id=user_id,
room=room,
)
whereby_meeting = await create_meeting("", end_date=end_date, room=room)
await upload_logo(whereby_meeting["roomName"], "./images/logo.png")
# Now try to save to database
try:
meeting = await meetings_controller.create(
id=whereby_meeting["meetingId"],
room_name=whereby_meeting["roomName"],
room_url=whereby_meeting["roomUrl"],
host_room_url=whereby_meeting["hostRoomUrl"],
start_date=datetime.fromisoformat(whereby_meeting["startDate"]),
end_date=datetime.fromisoformat(whereby_meeting["endDate"]),
user_id=user_id,
room=room,
)
except (asyncpg.exceptions.UniqueViolationError, sqlite3.IntegrityError):
# Another request already created a meeting for this room
# Log this race condition occurrence
logger.info(
"Race condition detected for room %s - fetching existing meeting",
room.name,
)
logger.warning(
"Whereby meeting %s was created but not used (resource leak) for room %s",
whereby_meeting["meetingId"],
room.name,
)
# Fetch the meeting that was created by the other request
meeting = await meetings_controller.get_active(
room=room, current_time=current_time
)
if meeting is None:
# Edge case: meeting was created but expired/deleted between checks
logger.error(
"Meeting disappeared after race condition for room %s", room.name
)
raise HTTPException(
status_code=503, detail="Unable to join meeting - please try again"
)
if user_id != room.user_id:
meeting.host_room_url = ""

View File

@@ -6,6 +6,7 @@ from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
from fastapi import APIRouter, Request
from prometheus_client import Gauge
from pydantic import BaseModel
from reflector.events import subscribers_shutdown
from reflector.logger import logger
from reflector.pipelines.runner import PipelineRunner

View File

@@ -1,12 +1,13 @@
from datetime import datetime, timedelta, timezone
from typing import Annotated, Literal, Optional
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Page
from fastapi_pagination.ext.databases import paginate
from jose import jwt
from pydantic import BaseModel, Field, field_serializer
import reflector.auth as auth
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (

View File

@@ -7,9 +7,10 @@ Transcripts audio related endpoints
from typing import Annotated, Optional
import httpx
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from jose import jwt
import reflector.auth as auth
from reflector.db.transcripts import AudioWaveform, transcripts_controller
from reflector.settings import settings
from reflector.views.transcripts import ALGORITHM

View File

@@ -6,9 +6,10 @@ Transcript participants API endpoints
from typing import Annotated, Optional
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, ConfigDict, Field
import reflector.auth as auth
from reflector.db.transcripts import TranscriptParticipant, transcripts_controller
from reflector.views.types import DeletionStatus

View File

@@ -1,9 +1,10 @@
from typing import Annotated, Optional
import celery
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import task_pipeline_process

View File

@@ -6,9 +6,10 @@ Reassign speakers in a transcript
from typing import Annotated, Optional
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
router = APIRouter()

View File

@@ -1,9 +1,10 @@
from typing import Annotated, Optional
import av
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException, UploadFile
from pydantic import BaseModel
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import task_pipeline_process

View File

@@ -1,7 +1,8 @@
from typing import Annotated, Optional
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException, Request
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
from .rtc_offer import RtcOffer, rtc_offer_base

View File

@@ -5,6 +5,7 @@ Transcripts websocket API
"""
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
from reflector.db.transcripts import transcripts_controller
from reflector.ws_manager import get_ws_manager

View File

@@ -1,9 +1,10 @@
from typing import Annotated, Optional
import reflector.auth as auth
from fastapi import APIRouter, Depends
from pydantic import BaseModel
import reflector.auth as auth
router = APIRouter()

View File

@@ -7,6 +7,7 @@ from hashlib import sha256
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from reflector.db.meetings import meetings_controller
from reflector.settings import settings

View File

@@ -1,8 +1,9 @@
from typing import Annotated, Optional
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import reflector.auth as auth
from reflector.zulip import get_zulip_streams, get_zulip_topics
router = APIRouter()

View File

@@ -1,6 +1,7 @@
from datetime import datetime
import httpx
from reflector.db.rooms import Room
from reflector.settings import settings

Some files were not shown because too many files have changed in this diff Show More