Skip to content

LCORE-1514: generate topic summary for first interrupted conversation#1359

Merged
tisnik merged 2 commits intolightspeed-core:mainfrom
karthikjeeyar:fix/topic-summary-on-interuppted-conv
Mar 19, 2026
Merged

LCORE-1514: generate topic summary for first interrupted conversation#1359
tisnik merged 2 commits intolightspeed-core:mainfrom
karthikjeeyar:fix/topic-summary-on-interuppted-conv

Conversation

@karthikjeeyar
Copy link
Contributor

@karthikjeeyar karthikjeeyar commented Mar 19, 2026

Description

Generates the topic summary for the first conversation that is interrupted.

Type of change

  • Bug fix
  • Unit tests improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: Cursor
  • Generated by: (e.g., tool name and version; N/A if not used)

Related Tickets & Documents

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling of interrupted conversations: for new conversations, topic summaries are now generated when requested, included in persisted results, and failures are logged and do not prevent storing the interrupted turn.
  • Tests

    • Added tests covering cancellation scenarios for new conversations: successful summary generation, summary generation failures, and when summary generation is disabled.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 19, 2026

Walkthrough

Conditionally generate and persist a topic summary for interrupted streaming-query turns when starting a new conversation (no conversation_id) and generate_topic_summary is true; exceptions during summary generation are caught and logged, and persistence proceeds with topic_summary=None if generation fails.

Changes

Cohort / File(s) Summary
Streaming Query Implementation
src/app/endpoints/streaming_query.py
In _persist_interrupted_turn initialize topic_summary=None; when context.query_request.conversation_id is absent and generate_topic_summary is true, call get_topic_summary(query, client, model), catch/log exceptions, and pass topic_summary (possibly None) to store_query_results(...) instead of always None.
Streaming Query Tests
tests/unit/app/endpoints/test_streaming_query.py
Updated cancelled-stream test to use a fixed existing conversation UUID; added three async tests for interruption on new conversations: (1) generate_topic_summary=True and get_topic_summary succeeds -> assert get_topic_summary called and returned summary persisted; (2) generate_topic_summary=True and get_topic_summary raises -> assert exception logged and topic_summary=None persisted; (3) generate_topic_summary=False -> assert get_topic_summary not called and topic_summary=None persisted. All assert interruption output and deregister_stream(...).

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Client as Client
participant Endpoint as StreamingEndpoint
participant Summarizer as TopicSummarizer
participant Store as Storage
participant StreamMgr as StreamManager

Client->>Endpoint: send query (no conversation_id, generate_topic_summary?)
Endpoint->>StreamMgr: register stream / start
alt generate_topic_summary == true
    Endpoint->>Summarizer: get_topic_summary(query, client, model)
    Note right of Summarizer: may succeed or raise
    Summarizer-->>Endpoint: topic_summary / error
end
Endpoint->>Store: store_query_results(..., topic_summary)
Endpoint->>StreamMgr: deregister_stream(request_id)
StreamMgr-->>Client: send interruption output

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 60.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: generating topic summaries for interrupted first conversations, which directly matches the core implementation in the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can generate a title for your PR based on the changes.

Add @coderabbitai placeholder anywhere in the title of your PR and CodeRabbit will replace it with a title based on the changes in the PR. You can change the placeholder by changing the reviews.auto_title_placeholder setting.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/app/endpoints/streaming_query.py (1)

406-415: ⚠️ Potential issue | 🟡 Minor

Preserve attachments when persisting interrupted turns.

The completed-stream path at Lines 586-596 passes attachments=context.query_request.attachments, but this branch still doesn't. Since store_query_results() forwards attachments into transcript creation in src/utils/query.py:226-290, interrupted requests will lose attachment metadata whenever transcript collection is enabled.

📎 Keep interrupted persistence aligned with the normal path
         store_query_results(
             user_id=context.user_id,
             conversation_id=context.conversation_id,
             model=responses_params.model,
             completed_at=completed_at,
             started_at=context.started_at,
             summary=turn_summary,
             query=context.query_request.query,
+            attachments=context.query_request.attachments,
             skip_userid_check=context.skip_userid_check,
             topic_summary=topic_summary,
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/endpoints/streaming_query.py` around lines 406 - 415, The
interrupted-turn persistence branch calling store_query_results currently omits
attachments, causing lost attachment metadata; update that call
(store_query_results) to include attachments=context.query_request.attachments
so the persisted transcript (as later consumed by the transcript creation in
query utilities) retains attachment data, ensuring the interrupted-stream path
matches the completed-stream path’s arguments.
🧹 Nitpick comments (1)
tests/unit/app/endpoints/test_streaming_query.py (1)

1457-1524: Cover the topic-summary failure branch too.

Lines 395-401 are the new guard that keeps interrupted persistence alive when get_topic_summary() fails, but the added tests only exercise the success and disabled cases. Please add a case where the mock raises and assert we still emit "interrupted" and store topic_summary=None.

Also applies to: 1526-1589

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1457 - 1524,
Add a new test similar to
test_generate_response_cancelled_persists_topic_summary_for_new_conversation
that simulates get_topic_summary raising an exception: patch
app.endpoints.streaming_query.get_topic_summary to an
AsyncMock(side_effect=Exception("err")), run generate_response with the same
mock_generator, mock_context, mock_responses_params and mock_turn_summary,
assert the yielded stream contains an "interrupted" event, assert
store_query_results was called and that its call kwargs include
topic_summary=None, and verify
isolate_stream_interrupt_registry.deregister_stream was called with the request
id; this covers the failure branch in generate_response where topic summary
lookup fails.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 383-394: This branch calls get_topic_summary(...) synchronously
before yielding the "interrupted" event, which can block interrupt
acknowledgements; wrap the call with a short timeout (e.g., use asyncio.wait_for
around get_topic_summary) and catch asyncio.TimeoutError so a slow backend
doesn't delay the interrupt path, or alternatively offload it via
asyncio.create_task to run in background and never await it on the critical
path; update the code around context.query_request.conversation_id and
should_generate to use the bounded call or background task and ensure
errors/timeouts are logged but do not prevent emitting the interrupted event.

---

Outside diff comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 406-415: The interrupted-turn persistence branch calling
store_query_results currently omits attachments, causing lost attachment
metadata; update that call (store_query_results) to include
attachments=context.query_request.attachments so the persisted transcript (as
later consumed by the transcript creation in query utilities) retains attachment
data, ensuring the interrupted-stream path matches the completed-stream path’s
arguments.

---

Nitpick comments:
In `@tests/unit/app/endpoints/test_streaming_query.py`:
- Around line 1457-1524: Add a new test similar to
test_generate_response_cancelled_persists_topic_summary_for_new_conversation
that simulates get_topic_summary raising an exception: patch
app.endpoints.streaming_query.get_topic_summary to an
AsyncMock(side_effect=Exception("err")), run generate_response with the same
mock_generator, mock_context, mock_responses_params and mock_turn_summary,
assert the yielded stream contains an "interrupted" event, assert
store_query_results was called and that its call kwargs include
topic_summary=None, and verify
isolate_stream_interrupt_registry.deregister_stream was called with the request
id; this covers the failure branch in generate_response where topic summary
lookup fails.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 94c37806-1e3c-400d-ac40-a1e87a8357df

📥 Commits

Reviewing files that changed from the base of the PR and between 5a52503 and 0c243db.

📒 Files selected for processing (2)
  • src/app/endpoints/streaming_query.py
  • tests/unit/app/endpoints/test_streaming_query.py

Comment on lines +383 to +394
if not context.query_request.conversation_id:
should_generate = context.query_request.generate_topic_summary
if should_generate:
try:
logger.debug(
"Generating topic summary for interrupted new conversation"
)
topic_summary = await get_topic_summary(
context.query_request.query,
context.client,
responses_params.model,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't put an unbounded summary RPC in front of the interrupt ack.

get_topic_summary() does another Llama Stack request (src/utils/responses.py:114-148), and this branch runs before Line 542 yields the "interrupted" event. If that backend is slow or degraded, a user-initiated cancel now waits on best-effort metadata instead of closing promptly. Please bound this call with a short timeout or move it off the response path.

⏱️ Example way to cap the cancel-path latency
                 try:
                     logger.debug(
                         "Generating topic summary for interrupted new conversation"
                     )
-                    topic_summary = await get_topic_summary(
-                        context.query_request.query,
-                        context.client,
-                        responses_params.model,
-                    )
+                    topic_summary = await asyncio.wait_for(
+                        get_topic_summary(
+                            context.query_request.query,
+                            context.client,
+                            responses_params.model,
+                        ),
+                        timeout=2,
+                    )
+                except TimeoutError as e:
+                    logger.warning(
+                        "Timed out generating topic summary for interrupted turn, "
+                        "request %s: %s",
+                        context.request_id,
+                        e,
+                    )
                 except Exception as e:  # pylint: disable=broad-except
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/endpoints/streaming_query.py` around lines 383 - 394, This branch
calls get_topic_summary(...) synchronously before yielding the "interrupted"
event, which can block interrupt acknowledgements; wrap the call with a short
timeout (e.g., use asyncio.wait_for around get_topic_summary) and catch
asyncio.TimeoutError so a slow backend doesn't delay the interrupt path, or
alternatively offload it via asyncio.create_task to run in background and never
await it on the critical path; update the code around
context.query_request.conversation_id and should_generate to use the bounded
call or background task and ensure errors/timeouts are logged but do not prevent
emitting the interrupted event.

Copy link
Contributor

@tisnik tisnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tisnik tisnik requested a review from jrobertboos March 19, 2026 15:24
Copy link
Contributor

@jrobertboos jrobertboos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
tests/unit/app/endpoints/test_streaming_query.py (2)

1520-1521: Strengthen persistence assertions before reading call args

At Line 1520 and Line 1650, assert store_query_results invocation count before inspecting kwargs, so duplicate persistence calls cannot slip through.

Suggested test hardening diff
-        call_kwargs = store_query_results_mock.call_args[1]
+        store_query_results_mock.assert_called_once()
+        call_kwargs = store_query_results_mock.call_args[1]
         assert call_kwargs["topic_summary"] == "Kubernetes container orchestration"
@@
-        call_kwargs = store_query_results_mock.call_args[1]
+        store_query_results_mock.assert_called_once()
+        call_kwargs = store_query_results_mock.call_args[1]
         assert call_kwargs["topic_summary"] is None

Also applies to: 1650-1651

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1520 - 1521,
The test inspects store_query_results_mock.call_args[1] without first asserting
how many times store_query_results was called, which could hide duplicate
persistence calls; before reading call_args in the tests that reference
store_query_results_mock (e.g., the assertions around topic_summary), add an
assertion on store_query_results_mock.assert_called_once() or assert
store_query_results_mock.call_count == 1 to ensure exactly one invocation, then
read call_kwargs = store_query_results_mock.call_args[1] and assert
call_kwargs["topic_summary"] == "Kubernetes container orchestration".

1457-1655: Consider parameterizing the three new cancellation/topic-summary tests

The setup and assertions are largely duplicated; a pytest.mark.parametrize matrix for (generate_topic_summary, get_topic_summary_side_effect, expected_topic_summary) would keep this easier to maintain.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1457 - 1655,
Consolidate the three nearly-identical tests into a single parametrized test
using pytest.mark.parametrize over (generate_topic_summary,
get_topic_summary_side_effect, expected_topic_summary) to avoid duplication:
update the test function (e.g.,
test_generate_response_cancelled_topic_summary_variants) to accept those params,
create the same mock_generator, mock_context (use a single test_request_id or
parametrize it), patch app.endpoints.streaming_query.get_topic_summary with
either new=mocker.AsyncMock(return_value=...) or
new=mocker.AsyncMock(side_effect=...) based on get_topic_summary_side_effect,
patch store_query_results and append_turn_to_conversation as before, call
generate_response and assert the interrupted event, then assert
get_topic_summary was called only when generate_topic_summary is True and
expected_topic_summary is used in
store_query_results_mock.call_args[1]["topic_summary"], and still assert
isolate_stream_interrupt_registry.deregister_stream was called with the request
id.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@tests/unit/app/endpoints/test_streaming_query.py`:
- Around line 1520-1521: The test inspects store_query_results_mock.call_args[1]
without first asserting how many times store_query_results was called, which
could hide duplicate persistence calls; before reading call_args in the tests
that reference store_query_results_mock (e.g., the assertions around
topic_summary), add an assertion on
store_query_results_mock.assert_called_once() or assert
store_query_results_mock.call_count == 1 to ensure exactly one invocation, then
read call_kwargs = store_query_results_mock.call_args[1] and assert
call_kwargs["topic_summary"] == "Kubernetes container orchestration".
- Around line 1457-1655: Consolidate the three nearly-identical tests into a
single parametrized test using pytest.mark.parametrize over
(generate_topic_summary, get_topic_summary_side_effect, expected_topic_summary)
to avoid duplication: update the test function (e.g.,
test_generate_response_cancelled_topic_summary_variants) to accept those params,
create the same mock_generator, mock_context (use a single test_request_id or
parametrize it), patch app.endpoints.streaming_query.get_topic_summary with
either new=mocker.AsyncMock(return_value=...) or
new=mocker.AsyncMock(side_effect=...) based on get_topic_summary_side_effect,
patch store_query_results and append_turn_to_conversation as before, call
generate_response and assert the interrupted event, then assert
get_topic_summary was called only when generate_topic_summary is True and
expected_topic_summary is used in
store_query_results_mock.call_args[1]["topic_summary"], and still assert
isolate_stream_interrupt_registry.deregister_stream was called with the request
id.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 16f5caf6-725e-471a-951f-0b05abd4d1e7

📥 Commits

Reviewing files that changed from the base of the PR and between 0c243db and b5a6240.

📒 Files selected for processing (1)
  • tests/unit/app/endpoints/test_streaming_query.py

@tisnik tisnik merged commit 609d73d into lightspeed-core:main Mar 19, 2026
21 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants