-
Notifications
You must be signed in to change notification settings - Fork 3.2k
feat(a2a): a2a added #2733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
feat(a2a): a2a added #2733
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR implements comprehensive A2A (Agent-to-Agent) protocol support, enabling Sim workflows to be exposed as A2A-compatible agents and allowing workflows to call external A2A agents. The implementation includes database schema, API endpoints, client tools, UI blocks, and React hooks.
Major Changes:
- Added
a2aAgentanda2aTaskdatabase tables with proper indexing - Implemented JSON-RPC 2.0 compliant API endpoints for agent discovery and task execution
- Created A2A block and three tools (send_task, get_task, cancel_task) for workflow integration
- Added agent card generation from workflow metadata following A2A protocol spec
- Implemented SSE streaming support for real-time task updates
- Added React Query hooks for agent management in the UI
Architecture:
The implementation follows the A2A protocol specification well, with proper separation between agent management (/api/a2a/agents), discovery (/api/a2a/agents/[agentId]), and execution (/api/a2a/serve/[agentId]). The code properly uses absolute imports as per project standards and integrates cleanly with existing Sim infrastructure.
Issues Found:
- Timeout parameter exists in UI but is marked "informational only" - confusing UX
executionIdis stored but never utilized for task tracking- No actual cancellation mechanism when timeouts occur
- Raw SQL used for task counting instead of Drizzle aggregations
The code is generally well-structured and follows project conventions. Most issues are minor style/UX improvements rather than functional bugs.
Confidence Score: 4/5
- This PR is safe to merge with minor style improvements recommended
- The implementation is comprehensive and follows the A2A protocol specification correctly. Database schema is properly designed with indexes and constraints. API endpoints handle authentication, validation, and error cases appropriately. The code follows project conventions for absolute imports and structure. Issues identified are non-critical style/UX improvements (confusing timeout parameter, unused executionId field, raw SQL that could use ORM methods). No security vulnerabilities or critical bugs found. Score of 4 reflects solid implementation with minor polish needed.
apps/sim/app/api/a2a/serve/[agentId]/route.ts- complex streaming logic with timeout handling that could be improved
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| packages/db/schema.ts | 5/5 | Added a2aAgent and a2aTask tables with proper indexes and constraints |
| apps/sim/app/api/a2a/agents/route.ts | 4/5 | Agent listing and creation endpoint with validation, one potential issue with duplicate checks |
| apps/sim/app/api/a2a/agents/[agentId]/route.ts | 5/5 | Agent CRUD operations with proper auth checks and validation |
| apps/sim/app/api/a2a/serve/[agentId]/route.ts | 3/5 | Complex A2A protocol implementation with streaming support, several issues identified |
| apps/sim/lib/a2a/types.ts | 5/5 | Complete A2A protocol type definitions, well-structured |
| apps/sim/tools/a2a/send_task.ts | 5/5 | Tool implementation for sending tasks to A2A agents |
Sequence Diagram
sequenceDiagram
participant Client as A2A Client
participant ServeAPI as /api/a2a/serve/[agentId]
participant DB as Database
participant WorkflowAPI as /api/workflows/[id]/execute
participant Workflow as Workflow Engine
Note over Client,Workflow: Task Send Flow (tasks/send)
Client->>ServeAPI: POST JSON-RPC tasks/send
ServeAPI->>DB: Verify agent is published
ServeAPI->>DB: Check workflow is deployed
alt Continuing existing task
ServeAPI->>DB: Load existing task
ServeAPI->>ServeAPI: Validate not in terminal state
end
ServeAPI->>DB: Create/update task (status: working)
ServeAPI->>WorkflowAPI: POST execute with task input
WorkflowAPI->>Workflow: Execute workflow blocks
Workflow-->>WorkflowAPI: Return output
WorkflowAPI-->>ServeAPI: Return execution result
ServeAPI->>DB: Update task (status: completed/failed)
ServeAPI-->>Client: Return Task with messages
Note over Client,Workflow: Task Query Flow (tasks/get)
Client->>ServeAPI: POST JSON-RPC tasks/get
ServeAPI->>DB: Query task by ID
ServeAPI-->>Client: Return Task status & messages
Note over Client,Workflow: Task Cancel Flow (tasks/cancel)
Client->>ServeAPI: POST JSON-RPC tasks/cancel
ServeAPI->>DB: Verify task exists & not terminal
ServeAPI->>DB: Update task (status: cancelled)
ServeAPI-->>Client: Return cancelled Task
Note over Client,Workflow: Streaming Flow (tasks/sendSubscribe)
Client->>ServeAPI: POST JSON-RPC tasks/sendSubscribe
ServeAPI->>DB: Create/update task (status: working)
ServeAPI->>WorkflowAPI: POST execute with streaming
loop Stream chunks
WorkflowAPI-->>ServeAPI: SSE chunk
ServeAPI-->>Client: SSE task:message event
end
WorkflowAPI-->>ServeAPI: Complete
ServeAPI->>DB: Update task (status: completed)
ServeAPI-->>Client: SSE task:status & task:done events
| status: finalStatus, | ||
| messages, | ||
| artifacts, | ||
| executionId: executeResult.metadata?.executionId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executionId stored but never used - consider using it for task tracking or remove if unnecessary
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| }, | ||
| triggerType: 'api', | ||
| }), | ||
| signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout applied but no cancellation mechanism - if fetch times out, task status in DB may not reflect actual state
apps/sim/blocks/blocks/a2a.ts
Outdated
| placeholder: '300000', | ||
| defaultValue: '300000', | ||
| description: 'Request timeout in milliseconds (informational - system uses default timeout)', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout described as "informational only" creates confusing UX - either make it functional or remove from UI
apps/sim/tools/a2a/send_task.ts
Outdated
| type: 'number', | ||
| default: A2A_DEFAULT_TIMEOUT, | ||
| description: | ||
| 'Request timeout in milliseconds (informational only - system uses default timeout)', | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout parameter is defined but never passed to request - either implement or remove from params schema
| taskCount: sql<number>`( | ||
| SELECT COUNT(*)::int | ||
| FROM "a2a_task" | ||
| WHERE "a2a_task"."agent_id" = "a2a_agent"."id" | ||
| )`.as('task_count'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raw SQL subquery - consider using Drizzle's aggregation methods for better type safety and consistency
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Implements comprehensive A2A (Agent-to-Agent) protocol support, enabling Sim Studio workflows to be exposed as discoverable A2A agents and allowing workflows to call external A2A-compatible agents.
Key Changes:
- Added three API endpoints for agent management (
/api/a2a/agents) and JSON-RPC task execution (/api/a2a/serve/[agentId]) - Implemented complete A2A protocol types following the specification (agent cards, tasks, messages, artifacts, JSON-RPC 2.0)
- Created database schema with
a2aAgentanda2aTasktables including proper indexes and cascading deletes - Added A2A block and three tools (send_task, get_task, cancel_task) for calling external agents
- Built React Query hooks for agent management with proper cache invalidation
- Supports both streaming (SSE) and non-streaming task execution with multi-turn conversations via sessions
- Validates workflows have Start blocks before agent creation and requires deployment before publishing
Architecture Highlights:
- Agent cards are auto-generated from workflow metadata with skills derived from input format
- Authentication handled via
checkHybridAuthwith support for Bearer tokens and API keys - Task state tracked in database with proper status transitions and message history
- Streaming implementation handles both SSE and JSON responses from workflow executor
- Proper error handling with A2A-specific error codes aligned with JSON-RPC
Code Quality:
- Follows all style guidelines: absolute imports, proper TypeScript typing (no
any), TSDoc comments - Uses
createLoggerfrom@sim/loggerinstead of console.log - Follows established patterns for API routes, blocks, tools, and React Query hooks
- Proper barrel exports with index.ts files for lib/a2a and tools/a2a modules
Confidence Score: 4/5
- This PR is safe to merge with minimal risk - the implementation is well-structured and follows established patterns
- Score reflects solid implementation following all coding standards with proper authentication, validation, and error handling. The PR adds significant new functionality (19 files) but is well-architected. No critical bugs or security issues found. Minor point deducted only for lack of tests mentioned in the checklist.
- No files require special attention - all implementations follow proper patterns and include appropriate error handling
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| apps/sim/app/api/a2a/serve/[agentId]/route.ts | 4/5 | Implements A2A JSON-RPC server with task execution, streaming, and SSE support - handles authentication and workflow integration correctly |
| apps/sim/app/api/a2a/agents/route.ts | 4/5 | List and create A2A agents - validates workflow deployment and start block requirements, prevents duplicate agents per workflow |
| apps/sim/app/api/a2a/agents/[agentId]/route.ts | 4/5 | CRUD operations for A2A agents with publish/refresh actions - properly checks workflow deployment before publishing |
| apps/sim/blocks/blocks/a2a.ts | 5/5 | A2A block configuration for calling external agents - well-structured with clear inputs/outputs and documentation |
| apps/sim/lib/a2a/agent-card.ts | 5/5 | Agent card generation from workflow metadata - correctly transforms workflow inputs to JSON Schema for A2A protocol |
| apps/sim/lib/a2a/types.ts | 5/5 | Complete A2A protocol type definitions - comprehensive coverage of agent cards, tasks, messages, and error codes |
| apps/sim/hooks/queries/a2a-agents.ts | 5/5 | React Query hooks for A2A agent management - follows established patterns with proper cache invalidation |
| packages/db/schema.ts | 5/5 | Database schema additions for A2A agents and tasks - proper indexes, constraints, and cascading deletes |
Sequence Diagram
sequenceDiagram
participant Client as External A2A Client
participant ServeAPI as A2A Serve API
participant Auth as Authentication
participant DB as Database
participant WorkflowAPI as Workflow API
participant Executor as Workflow Executor
Note over Client,Executor: A2A Task Send Flow
Client->>ServeAPI: POST JSON-RPC tasks/send
ServeAPI->>DB: Query agent and verify published
DB-->>ServeAPI: Agent metadata
ServeAPI->>Auth: Verify credentials
Auth-->>ServeAPI: Authentication result
ServeAPI->>DB: Check workflow deployed
DB-->>ServeAPI: Deployment status
alt New Task
ServeAPI->>DB: Insert new task record
else Continue Task
ServeAPI->>DB: Update existing task
DB-->>ServeAPI: Existing messages
end
ServeAPI->>WorkflowAPI: POST execute with task message
WorkflowAPI->>Executor: Execute workflow
Executor-->>WorkflowAPI: Execution result
WorkflowAPI-->>ServeAPI: Output and artifacts
ServeAPI->>DB: Update task with result
ServeAPI-->>Client: JSON-RPC response with Task
Note over Client,Executor: A2A Streaming Flow
Client->>ServeAPI: POST JSON-RPC tasks/sendSubscribe
ServeAPI->>DB: Create or update task
ServeAPI->>WorkflowAPI: POST with streaming enabled
loop Streaming chunks
WorkflowAPI-->>ServeAPI: Stream chunk
ServeAPI-->>Client: SSE task:message event
end
WorkflowAPI-->>ServeAPI: Complete
ServeAPI->>DB: Update task status to completed
ServeAPI-->>Client: SSE task:status final
ServeAPI-->>Client: SSE task:done
Note over Client,Executor: Agent Management
participant UI as UI Client
participant AgentAPI as Agent API
UI->>AgentAPI: POST create agent
AgentAPI->>Auth: Verify user access
AgentAPI->>DB: Check workflow has Start block
AgentAPI->>DB: Generate skills from workflow
AgentAPI->>DB: Insert agent record
AgentAPI-->>UI: Agent created
UI->>AgentAPI: POST publish action
AgentAPI->>DB: Verify workflow deployed
AgentAPI->>DB: Update published status
AgentAPI-->>UI: Agent published
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Implements Agent-to-Agent (A2A) protocol v0.2.6 support, enabling workflows to be exposed as A2A-compatible agents and allowing workflows to call external A2A agents. Adds comprehensive API endpoints for agent management (CRUD operations), task execution via JSON-RPC, and client tools for interacting with A2A agents. Includes database schema for agents and tasks with proper indexes and cascade deletes.
Confidence Score: 2/5
- Moderate risk due to authentication issues and missing error handling in critical paths
- The implementation has several logic issues that could affect functionality: (1) Auth header handling allows JWT checks to block API key fallbacks via Authorization header, (2) Failed workflow executions don't add error messages to task history, leaving clients unable to retrieve error details, (3) Streaming error path has the same issue with missing error messages in history. Additionally, there are concerns about API key forwarding to workflow execution endpoints and performance implications of unbounded task counting.
- apps/sim/app/api/a2a/serve/[agentId]/route.ts and apps/sim/lib/auth/hybrid.ts require fixes for error handling and authentication logic
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| apps/sim/app/api/a2a/serve/[agentId]/route.ts | 2/5 | Implements JSON-RPC A2A protocol endpoints for task execution; missing error messages in task history on failures, auth forwarding needs verification |
| apps/sim/lib/auth/hybrid.ts | 2/5 | Modified auth logic to support API keys via Authorization header; JWT check happens first, potentially blocking API key fallback |
| apps/sim/app/api/a2a/agents/route.ts | 3/5 | Adds agent CRUD endpoints with proper validation and uniqueness constraints; SQL subquery for task counts could impact performance at scale |
| packages/db/schema.ts | 4/5 | Adds a2aAgent and a2aTask tables with proper indexes and cascade deletes; schema aligns well with A2A protocol v0.2.6 |
Sequence Diagram
sequenceDiagram
participant Client as A2A Client
participant Serve as /api/a2a/serve/[id]
participant Auth as checkHybridAuth
participant DB as Database
participant Workflow as /api/workflows/execute
Client->>Serve: POST tasks/send (JSON-RPC)
Serve->>Auth: Verify credentials
Auth-->>Serve: userId, authType
Serve->>DB: Load agent & verify published
DB-->>Serve: Agent metadata
Serve->>DB: Create/update a2aTask record
DB-->>Serve: Task created
Serve->>Workflow: POST execute (with X-API-Key)
Workflow-->>Serve: Execution result
Serve->>DB: Update task with result & agent message
DB-->>Serve: Task updated
Serve-->>Client: JSON-RPC response with Task object
Note over Client,Workflow: Error Path
Workflow-->>Serve: Execution error
Serve->>DB: Mark task as failed (no error in history)
Serve-->>Client: JSON-RPC error response
| const errorMessage = error instanceof Error ? error.message : 'Workflow execution failed' | ||
|
|
||
| await db | ||
| .update(a2aTask) | ||
| .set({ | ||
| status: 'failed', | ||
| updatedAt: new Date(), | ||
| completedAt: new Date(), | ||
| }) | ||
| .where(eq(a2aTask.id, taskId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When workflow execution fails, the error message is not added to the task history before the task is marked as failed. This means clients won't see the error details in the message history—they'll only see a failed status. Add the error as an agent message to the history before updating the task.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 393:402
Comment:
When workflow execution fails, the error message is not added to the task history before the task is marked as failed. This means clients won't see the error details in the message history—they'll only see a failed status. Add the error as an agent message to the history before updating the task.
How can I resolve this? If you propose a fix, please make it concise.| // 3. Try API key auth (check both X-API-Key and Authorization: Bearer as fallback) | ||
| const bearerToken = authHeader?.startsWith('Bearer ') ? authHeader.slice(7) : null | ||
| const apiKeyHeader = request.headers.get('x-api-key') || bearerToken |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auth header is checked twice—once for JWT verification (line 34) and again here for API key extraction (line 117). When an API key is passed via Authorization: Bearer <api-key>, it will first be checked as a JWT token and rejected if invalid. The bearerToken fallback logic was added but the JWT check happens first, preventing API keys from working via the Authorization header unless they're also valid JWTs.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/lib/auth/hybrid.ts
Line: 116:118
Comment:
Auth header is checked twice—once for JWT verification (line 34) and again here for API key extraction (line 117). When an API key is passed via `Authorization: Bearer <api-key>`, it will first be checked as a JWT token and rejected if invalid. The bearerToken fallback logic was added but the JWT check happens first, preventing API keys from working via the Authorization header unless they're also valid JWTs.
How can I resolve this? If you propose a fix, please make it concise.| taskCount: sql<number>`( | ||
| SELECT COUNT(*)::int | ||
| FROM "a2a_task" | ||
| WHERE "a2a_task"."agent_id" = "a2a_agent"."id" | ||
| )`.as('task_count'), | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL subquery counts all tasks for each agent without filtering. For agents with thousands of tasks, this could slow down the agent list endpoint. Consider adding a time-based filter (e.g., tasks from last 30 days) or caching task counts.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/agents/route.ts
Line: 71:76
Comment:
SQL subquery counts all tasks for each agent without filtering. For agents with thousands of tasks, this could slow down the agent list endpoint. Consider adding a time-based filter (e.g., tasks from last 30 days) or caching task counts.
How can I resolve this? If you propose a fix, please make it concise.| logger.error(`Streaming error for task ${taskId}:`, error) | ||
|
|
||
| await db | ||
| .update(a2aTask) | ||
| .set({ | ||
| status: 'failed', | ||
| completedAt: new Date(), | ||
| updatedAt: new Date(), | ||
| }) | ||
| .where(eq(a2aTask.id, taskId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When streaming fails, the error is logged and sent to the client, but the task's history is not updated with the error message. Similar to the non-streaming path, add the error as an agent message to the history so clients can retrieve the error details via tasks/get.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 711:720
Comment:
When streaming fails, the error is logged and sent to the client, but the task's history is not updated with the error message. Similar to the non-streaming path, add the error as an agent message to the history so clients can retrieve the error details via `tasks/get`.
How can I resolve this? If you propose a fix, please make it concise.| const executeUrl = `${getBaseUrl()}/api/workflows/${agent.workflowId}/execute` | ||
| const headers: Record<string, string> = { 'Content-Type': 'application/json' } | ||
| if (apiKey) headers['X-API-Key'] = apiKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API key is extracted from request headers but passed via internal X-API-Key header to the workflow execution endpoint. Verify that /api/workflows/:id/execute accepts and validates the X-API-Key header, otherwise authentication will fail for A2A requests.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 327:329
Comment:
The API key is extracted from request headers but passed via internal `X-API-Key` header to the workflow execution endpoint. Verify that `/api/workflows/:id/execute` accepts and validates the `X-API-Key` header, otherwise authentication will fail for A2A requests.
How can I resolve this? If you propose a fix, please make it concise.| input: messageText, | ||
| triggerType: 'api', | ||
| }), | ||
| signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timeout is set to 5 minutes (300,000ms) which could cause long-running workflows to fail mid-execution. Consider making this timeout configurable per agent or using a longer default for complex workflows.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 344:344
Comment:
Timeout is set to 5 minutes (300,000ms) which could cause long-running workflows to fail mid-execution. Consider making this timeout configurable per agent or using a longer default for complex workflows.
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise.
Summary
Implemented a2a support.
Type of Change
Testing
Solo.
Checklist