LCORE-1514: generate topic summary for first interrupted conversation#1359
Conversation
WalkthroughConditionally generate and persist a topic summary for interrupted streaming-query turns when starting a new conversation (no conversation_id) and Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can generate a title for your PR based on the changes.Add |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/app/endpoints/streaming_query.py (1)
406-415:⚠️ Potential issue | 🟡 MinorPreserve attachments when persisting interrupted turns.
The completed-stream path at Lines 586-596 passes
attachments=context.query_request.attachments, but this branch still doesn't. Sincestore_query_results()forwardsattachmentsinto transcript creation insrc/utils/query.py:226-290, interrupted requests will lose attachment metadata whenever transcript collection is enabled.📎 Keep interrupted persistence aligned with the normal path
store_query_results( user_id=context.user_id, conversation_id=context.conversation_id, model=responses_params.model, completed_at=completed_at, started_at=context.started_at, summary=turn_summary, query=context.query_request.query, + attachments=context.query_request.attachments, skip_userid_check=context.skip_userid_check, topic_summary=topic_summary, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/app/endpoints/streaming_query.py` around lines 406 - 415, The interrupted-turn persistence branch calling store_query_results currently omits attachments, causing lost attachment metadata; update that call (store_query_results) to include attachments=context.query_request.attachments so the persisted transcript (as later consumed by the transcript creation in query utilities) retains attachment data, ensuring the interrupted-stream path matches the completed-stream path’s arguments.
🧹 Nitpick comments (1)
tests/unit/app/endpoints/test_streaming_query.py (1)
1457-1524: Cover the topic-summary failure branch too.Lines 395-401 are the new guard that keeps interrupted persistence alive when
get_topic_summary()fails, but the added tests only exercise the success and disabled cases. Please add a case where the mock raises and assert we still emit"interrupted"and storetopic_summary=None.Also applies to: 1526-1589
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1457 - 1524, Add a new test similar to test_generate_response_cancelled_persists_topic_summary_for_new_conversation that simulates get_topic_summary raising an exception: patch app.endpoints.streaming_query.get_topic_summary to an AsyncMock(side_effect=Exception("err")), run generate_response with the same mock_generator, mock_context, mock_responses_params and mock_turn_summary, assert the yielded stream contains an "interrupted" event, assert store_query_results was called and that its call kwargs include topic_summary=None, and verify isolate_stream_interrupt_registry.deregister_stream was called with the request id; this covers the failure branch in generate_response where topic summary lookup fails.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 383-394: This branch calls get_topic_summary(...) synchronously
before yielding the "interrupted" event, which can block interrupt
acknowledgements; wrap the call with a short timeout (e.g., use asyncio.wait_for
around get_topic_summary) and catch asyncio.TimeoutError so a slow backend
doesn't delay the interrupt path, or alternatively offload it via
asyncio.create_task to run in background and never await it on the critical
path; update the code around context.query_request.conversation_id and
should_generate to use the bounded call or background task and ensure
errors/timeouts are logged but do not prevent emitting the interrupted event.
---
Outside diff comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 406-415: The interrupted-turn persistence branch calling
store_query_results currently omits attachments, causing lost attachment
metadata; update that call (store_query_results) to include
attachments=context.query_request.attachments so the persisted transcript (as
later consumed by the transcript creation in query utilities) retains attachment
data, ensuring the interrupted-stream path matches the completed-stream path’s
arguments.
---
Nitpick comments:
In `@tests/unit/app/endpoints/test_streaming_query.py`:
- Around line 1457-1524: Add a new test similar to
test_generate_response_cancelled_persists_topic_summary_for_new_conversation
that simulates get_topic_summary raising an exception: patch
app.endpoints.streaming_query.get_topic_summary to an
AsyncMock(side_effect=Exception("err")), run generate_response with the same
mock_generator, mock_context, mock_responses_params and mock_turn_summary,
assert the yielded stream contains an "interrupted" event, assert
store_query_results was called and that its call kwargs include
topic_summary=None, and verify
isolate_stream_interrupt_registry.deregister_stream was called with the request
id; this covers the failure branch in generate_response where topic summary
lookup fails.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 94c37806-1e3c-400d-ac40-a1e87a8357df
📒 Files selected for processing (2)
src/app/endpoints/streaming_query.pytests/unit/app/endpoints/test_streaming_query.py
| if not context.query_request.conversation_id: | ||
| should_generate = context.query_request.generate_topic_summary | ||
| if should_generate: | ||
| try: | ||
| logger.debug( | ||
| "Generating topic summary for interrupted new conversation" | ||
| ) | ||
| topic_summary = await get_topic_summary( | ||
| context.query_request.query, | ||
| context.client, | ||
| responses_params.model, | ||
| ) |
There was a problem hiding this comment.
Don't put an unbounded summary RPC in front of the interrupt ack.
get_topic_summary() does another Llama Stack request (src/utils/responses.py:114-148), and this branch runs before Line 542 yields the "interrupted" event. If that backend is slow or degraded, a user-initiated cancel now waits on best-effort metadata instead of closing promptly. Please bound this call with a short timeout or move it off the response path.
⏱️ Example way to cap the cancel-path latency
try:
logger.debug(
"Generating topic summary for interrupted new conversation"
)
- topic_summary = await get_topic_summary(
- context.query_request.query,
- context.client,
- responses_params.model,
- )
+ topic_summary = await asyncio.wait_for(
+ get_topic_summary(
+ context.query_request.query,
+ context.client,
+ responses_params.model,
+ ),
+ timeout=2,
+ )
+ except TimeoutError as e:
+ logger.warning(
+ "Timed out generating topic summary for interrupted turn, "
+ "request %s: %s",
+ context.request_id,
+ e,
+ )
except Exception as e: # pylint: disable=broad-except🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/app/endpoints/streaming_query.py` around lines 383 - 394, This branch
calls get_topic_summary(...) synchronously before yielding the "interrupted"
event, which can block interrupt acknowledgements; wrap the call with a short
timeout (e.g., use asyncio.wait_for around get_topic_summary) and catch
asyncio.TimeoutError so a slow backend doesn't delay the interrupt path, or
alternatively offload it via asyncio.create_task to run in background and never
await it on the critical path; update the code around
context.query_request.conversation_id and should_generate to use the bounded
call or background task and ensure errors/timeouts are logged but do not prevent
emitting the interrupted event.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
tests/unit/app/endpoints/test_streaming_query.py (2)
1520-1521: Strengthen persistence assertions before reading call argsAt Line 1520 and Line 1650, assert
store_query_resultsinvocation count before inspecting kwargs, so duplicate persistence calls cannot slip through.Suggested test hardening diff
- call_kwargs = store_query_results_mock.call_args[1] + store_query_results_mock.assert_called_once() + call_kwargs = store_query_results_mock.call_args[1] assert call_kwargs["topic_summary"] == "Kubernetes container orchestration" @@ - call_kwargs = store_query_results_mock.call_args[1] + store_query_results_mock.assert_called_once() + call_kwargs = store_query_results_mock.call_args[1] assert call_kwargs["topic_summary"] is NoneAlso applies to: 1650-1651
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1520 - 1521, The test inspects store_query_results_mock.call_args[1] without first asserting how many times store_query_results was called, which could hide duplicate persistence calls; before reading call_args in the tests that reference store_query_results_mock (e.g., the assertions around topic_summary), add an assertion on store_query_results_mock.assert_called_once() or assert store_query_results_mock.call_count == 1 to ensure exactly one invocation, then read call_kwargs = store_query_results_mock.call_args[1] and assert call_kwargs["topic_summary"] == "Kubernetes container orchestration".
1457-1655: Consider parameterizing the three new cancellation/topic-summary testsThe setup and assertions are largely duplicated; a
pytest.mark.parametrizematrix for(generate_topic_summary, get_topic_summary_side_effect, expected_topic_summary)would keep this easier to maintain.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1457 - 1655, Consolidate the three nearly-identical tests into a single parametrized test using pytest.mark.parametrize over (generate_topic_summary, get_topic_summary_side_effect, expected_topic_summary) to avoid duplication: update the test function (e.g., test_generate_response_cancelled_topic_summary_variants) to accept those params, create the same mock_generator, mock_context (use a single test_request_id or parametrize it), patch app.endpoints.streaming_query.get_topic_summary with either new=mocker.AsyncMock(return_value=...) or new=mocker.AsyncMock(side_effect=...) based on get_topic_summary_side_effect, patch store_query_results and append_turn_to_conversation as before, call generate_response and assert the interrupted event, then assert get_topic_summary was called only when generate_topic_summary is True and expected_topic_summary is used in store_query_results_mock.call_args[1]["topic_summary"], and still assert isolate_stream_interrupt_registry.deregister_stream was called with the request id.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tests/unit/app/endpoints/test_streaming_query.py`:
- Around line 1520-1521: The test inspects store_query_results_mock.call_args[1]
without first asserting how many times store_query_results was called, which
could hide duplicate persistence calls; before reading call_args in the tests
that reference store_query_results_mock (e.g., the assertions around
topic_summary), add an assertion on
store_query_results_mock.assert_called_once() or assert
store_query_results_mock.call_count == 1 to ensure exactly one invocation, then
read call_kwargs = store_query_results_mock.call_args[1] and assert
call_kwargs["topic_summary"] == "Kubernetes container orchestration".
- Around line 1457-1655: Consolidate the three nearly-identical tests into a
single parametrized test using pytest.mark.parametrize over
(generate_topic_summary, get_topic_summary_side_effect, expected_topic_summary)
to avoid duplication: update the test function (e.g.,
test_generate_response_cancelled_topic_summary_variants) to accept those params,
create the same mock_generator, mock_context (use a single test_request_id or
parametrize it), patch app.endpoints.streaming_query.get_topic_summary with
either new=mocker.AsyncMock(return_value=...) or
new=mocker.AsyncMock(side_effect=...) based on get_topic_summary_side_effect,
patch store_query_results and append_turn_to_conversation as before, call
generate_response and assert the interrupted event, then assert
get_topic_summary was called only when generate_topic_summary is True and
expected_topic_summary is used in
store_query_results_mock.call_args[1]["topic_summary"], and still assert
isolate_stream_interrupt_registry.deregister_stream was called with the request
id.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 16f5caf6-725e-471a-951f-0b05abd4d1e7
📒 Files selected for processing (1)
tests/unit/app/endpoints/test_streaming_query.py
Description
Generates the topic summary for the first conversation that is interrupted.
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
Bug Fixes
Tests