-
Notifications
You must be signed in to change notification settings - Fork 79
feat(memory): Optimize API calls with batched storage for better latency/cost #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Add opt-in storage_version="v2" parameter to AgentCoreMemorySessionManager
that enables batched API calls by using unified actorId.
Changes:
- Add storage_version parameter ("v1" default, "v2" opt-in)
- v2 batches message + agent state in single API call
e730d72 to
46ce469
Compare
| MessageAddedEvent, lambda event: self.save_message_with_state(event.message, event.agent) | ||
| ) | ||
| registry.add_callback(AfterInvocationEvent, lambda event: self._sync_agent_state(event.agent)) | ||
| registry.add_callback(MessageAddedEvent, lambda event: self.retrieve_customer_context(event)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: This v2 branch is missing the multi-agent hooks that the parent class registers:
MultiAgentInitializedEvent→initialize_multi_agentAfterNodeCallEvent→sync_multi_agentAfterMultiAgentInvocationEvent→sync_multi_agent
Multi-agent scenarios will silently fail to persist state with v2. Consider either:
- Adding these hooks explicitly here, or
- Calling
RepositorySessionManager.register_hooks()first, then overriding specific callbacks
Suggested fix:
# Add after line 735:
registry.add_callback(MultiAgentInitializedEvent, lambda event: self.initialize_multi_agent(event.source))
registry.add_callback(AfterNodeCallEvent, lambda event: self.sync_multi_agent(event.source))
registry.add_callback(AfterMultiAgentInvocationEvent, lambda event: self.sync_multi_agent(event.source))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added multi-agent hooks from strands.experimental.hooks.multiagent
| "Synced agent state: event=%s, agent=%s", event.get("event", {}).get("eventId"), agent.agent_id | ||
| ) | ||
| except Exception as e: | ||
| logger.error("Failed to sync agent state: %s", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small: This silently swallows the exception, but save_message_with_state (line 243) raises SessionException on failure. Should this also raise to maintain consistent error handling? Silent failures here could lead to undetected data loss.
Suggested fix:
except Exception as e:
logger.error("Failed to sync agent state: %s", e)
raise SessionException(f"Failed to sync agent state: {e}") from e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed - raises SessionException on failure
| agent_data = json.loads(events[0].get("payload", {})[0].get("blob")) | ||
| return SessionAgent.from_dict(agent_data) | ||
| try: | ||
| if self.storage_version == "v2": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: V2 only queries config.actor_id, but V1 stored agent state under agent_{id}. If a user switches from v1 to v2 on an existing session, their agent state becomes inaccessible (appears as a new agent, losing previous state).
This silently loses access to existing data when upgrading.
Suggested fix - add fallback to v1 location:
# After the v2 loop finds nothing, before returning None:
# Fallback: check v1 location for migration support
events = self.memory_client.list_events(
memory_id=self.config.memory_id,
actor_id=self._get_full_agent_id(agent_id),
session_id=session_id,
max_results=1,
)
if events:
agent_data = json.loads(events[0].get("payload", {})[0].get("blob"))
return SessionAgent.from_dict(agent_data)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed - dual-read with legacy fallback
| logger.error("Failed to save message with state: %s", e) | ||
| raise SessionException(f"Failed to save message with state: {e}") from e | ||
|
|
||
| def _sync_agent_state(self, agent: "Agent") -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optimization: I know you stated one of the main purposes of the PR is to reduce redundant API calls, this isn't a big issue but I believe there is a chance for more optimization here. Agent state is saved twice at the end of each invocation:
- With the final message via
save_message_with_state(line 732) - Again here via
_sync_agent_state
This adds an extra API call per invocation. The final sync is meant to capture conversation_manager_state updates that happen after the last message, but in most cases the state hasn't changed.
Suggested optimization: Track whether state changed since last save, and skip the final sync if unchanged:
# In save_message_with_state, store a hash/snapshot:
self._last_synced_state = hash(json.dumps(session_agent.to_dict(), sort_keys=True))
# In syncagent_state, check before saving:
def syncagent_state(self, agent: "Agent") -> None:
current_state = hash(json.dumps(SessionAgent.from_agent(agent).to_dict(), sort_keys=True))
if current_state == getattr(self, '_last_synced_state', None):
return # No change, skip sync
# ... rest of method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed - skips sync when state unchanged
5f872f9 to
ba2fab9
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #223 +/- ##
=======================================
Coverage ? 90.25%
=======================================
Files ? 35
Lines ? 3488
Branches ? 518
=======================================
Hits ? 3148
Misses ? 190
Partials ? 150
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ba2fab9 to
4ff0e93
Compare
aidandaly24
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me -- approved
Closes #222
Summary
API Call Reduction
Implementation Details
Batched Storage
_save_message_with_state()_typemarkers:"message"and"agent_state"State Hash Tracking
_compute_state_hash()computes hash excluding timestamps (created_at, updated_at)AfterInvocationEventsync is skipped when state is unchangedDual-Read for Backward Compatibility
read_agent()tries new format first (unified actorId with type markers)agent_{id}actorId) for existing dataFiles Changed
session_manager.py: Batched storage, hash tracking, dual-readbedrock_converter.py: Parse new payload formatTest Plan
Breaking Changes
None - existing data is read via legacy fallback, new data uses optimized format.