diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/diarization_pipeline.py index 52acd27a..912aa220 100644 --- a/server/reflector/hatchet/workflows/diarization_pipeline.py +++ b/server/reflector/hatchet/workflows/diarization_pipeline.py @@ -81,6 +81,7 @@ from reflector.utils.daily import ( parse_daily_recording_filename, ) from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty +from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT from reflector.zulip import post_transcript_notification @@ -558,9 +559,6 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: TranscriptTopic, transcripts_controller, ) - from reflector.utils.transcript_constants import ( # noqa: PLC0415 - TOPIC_CHUNK_WORD_COUNT, - ) chunk_size = TOPIC_CHUNK_WORD_COUNT chunks = [] @@ -606,13 +604,11 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: results = await topic_chunk_workflow.aio_run_many(bulk_runs) - topic_results = [] - for result in results: - task_result = result.get("detect_chunk_topic", {}) - if task_result: - topic_results.append(task_result) - - topic_results.sort(key=lambda t: t.get("timestamp", 0)) + topic_results = [ + result.get("detect_chunk_topic", {}) + for result in results + if result.get("detect_chunk_topic") + ] async with fresh_db_connection(): transcript = await transcripts_controller.get_by_id(input.transcript_id) @@ -821,10 +817,9 @@ async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubject results = await subject_workflow.aio_run_many(bulk_runs) - subject_summaries = [] - for result in results: - task_result = result.get("generate_detailed_summary", {}) - subject_summaries.append(task_result) + subject_summaries = [ + result.get("generate_detailed_summary", {}) for result in results + ] ctx.log(f"process_subjects complete: {len(subject_summaries)} summaries")