interceptCall(
/**
* Safely extracts peer information from the ServerCall.
- *
+ *
* @param serverCall the gRPC ServerCall
* @return peer information string, or "unknown" if not available
*/
diff --git a/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java b/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java
new file mode 100644
index 000000000..f67811e5d
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java
@@ -0,0 +1,51 @@
+package io.a2a.server.interceptors;
+
+import org.jspecify.annotations.NonNull;
+import org.jspecify.annotations.Nullable;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Represents the context of a method invocation for interceptors.
+ *
+ * This record captures the target object, method being invoked, and parameters
+ * passed to the method. It's used by interceptors to extract attributes and
+ * execute the actual method invocation.
+ *
+ * The parameters array is defensively copied to prevent external modification,
+ * ensuring immutability of the InvocationContext after construction.
+ *
+ * @param target the object on which the method is being invoked (never null)
+ * @param method the method being invoked (may be null if method resolution fails)
+ * @param parameters the parameters passed to the method (may be null or empty)
+ */
+public record InvocationContext(@NonNull Object target, @NonNull Method method, Object @Nullable[] parameters) {
+
+ /**
+ * Compact constructor with validation and defensive copying.
+ */
+ public InvocationContext {
+ if (target == null) {
+ throw new IllegalArgumentException("target cannot be null");
+ }
+ if (method == null) {
+ throw new IllegalArgumentException("method cannot be null");
+ }
+ }
+
+ /**
+ * Invokes the method on the target with the provided parameters.
+ *
+ * @return the result of the method invocation
+ * @throws IllegalStateException if method is null
+ * @throws IllegalAccessException if the method is not accessible
+ * @throws IllegalArgumentException if parameters don't match method signature
+ * @throws InvocationTargetException if the invoked method throws an exception
+ */
+ public @Nullable Object proceed() throws IllegalAccessException,
+ IllegalArgumentException,
+ InvocationTargetException {
+ return method.invoke(target, parameters);
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/interceptors/Kind.java b/server-common/src/main/java/io/a2a/server/interceptors/Kind.java
new file mode 100644
index 000000000..51e093908
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/interceptors/Kind.java
@@ -0,0 +1,5 @@
+package io.a2a.server.interceptors;
+
+public enum Kind {
+ INTERNAL, SERVER, CLIENT, PRODUCER, CONSUMER;
+}
diff --git a/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java b/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java
new file mode 100644
index 000000000..8e027b46b
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java
@@ -0,0 +1,14 @@
+package io.a2a.server.interceptors;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class NoAttributeExtractor implements Supplier>> {
+
+ @Override
+ public Function> get() {
+ return ctx -> Collections.emptyMap();
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/interceptors/Trace.java b/server-common/src/main/java/io/a2a/server/interceptors/Trace.java
new file mode 100644
index 000000000..7cdbad288
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/interceptors/Trace.java
@@ -0,0 +1,27 @@
+package io.a2a.server.interceptors;
+
+import jakarta.enterprise.util.Nonbinding;
+import jakarta.interceptor.InterceptorBinding;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Framework-agnostic annotation for method tracing.
+ * Works with both Jakarta EE CDI interceptors and Spring AOP.
+ */
+@InterceptorBinding
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+@Inherited
+public @interface Trace {
+ @Nonbinding
+ Kind kind() default Kind.SERVER;
+ @Nonbinding
+ Class extends Supplier>>> extractor() default NoAttributeExtractor.class;
+}
diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
index e19f7920d..4f55c7552 100644
--- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
+++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
@@ -1,5 +1,6 @@
package io.a2a.server.requesthandlers;
+import static io.a2a.server.interceptors.Kind.SERVER;
import static io.a2a.server.util.async.AsyncUtils.convertingProcessor;
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
import static io.a2a.server.util.async.AsyncUtils.processor;
@@ -37,6 +38,7 @@
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.QueueManager;
import io.a2a.server.events.TaskQueueExistsException;
+import io.a2a.server.interceptors.Trace;
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.ResultAggregator;
@@ -278,6 +280,7 @@ public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStor
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws A2AError {
LOGGER.debug("onGetTask {}", params.id());
Task task = taskStore.get(params.id());
@@ -311,6 +314,7 @@ private static Task limitTaskHistory(Task task, @Nullable Integer historyLength)
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext context) throws A2AError {
LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}, lastUpdatedAfter={}",
params.contextId(), params.status(), params.pageSize(), params.pageToken(), params.lastUpdatedAfter());
@@ -333,6 +337,7 @@ public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext con
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws A2AError {
Task task = taskStore.get(params.id());
if (task == null) {
@@ -385,6 +390,7 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public EventKind onMessageSend(MessageSendParams params, ServerCallContext context) throws A2AError {
LOGGER.debug("onMessageSend - task: {}; context {}", params.message().taskId(), params.message().contextId());
MessageSendSetup mss = initMessageSend(params, context);
@@ -520,6 +526,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public Flow.Publisher onMessageSendStream(
MessageSendParams params, ServerCallContext context) throws A2AError {
LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}",
@@ -682,6 +689,7 @@ private void startBackgroundConsumption() {
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public TaskPushNotificationConfig onSetTaskPushNotificationConfig(
TaskPushNotificationConfig params, ServerCallContext context) throws A2AError {
if (pushConfigStore == null) {
@@ -697,6 +705,7 @@ public TaskPushNotificationConfig onSetTaskPushNotificationConfig(
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public TaskPushNotificationConfig onGetTaskPushNotificationConfig(
GetTaskPushNotificationConfigParams params, ServerCallContext context) throws A2AError {
if (pushConfigStore == null) {
@@ -729,6 +738,7 @@ private PushNotificationConfig getPushNotificationConfig(ListTaskPushNotificatio
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public Flow.Publisher onResubscribeToTask(
TaskIdParams params, ServerCallContext context) throws A2AError {
LOGGER.debug("onResubscribeToTask - taskId: {}", params.id());
@@ -760,6 +770,7 @@ public Flow.Publisher onResubscribeToTask(
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public ListTaskPushNotificationConfigResult onListTaskPushNotificationConfig(
ListTaskPushNotificationConfigParams params, ServerCallContext context) throws A2AError {
if (pushConfigStore == null) {
@@ -773,6 +784,7 @@ public ListTaskPushNotificationConfigResult onListTaskPushNotificationConfig(
}
@Override
+ @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER)
public void onDeleteTaskPushNotificationConfig(
DeleteTaskPushNotificationConfigParams params, ServerCallContext context) {
if (pushConfigStore == null) {
diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java b/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java
new file mode 100644
index 000000000..c9d289139
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java
@@ -0,0 +1,65 @@
+package io.a2a.server.requesthandlers;
+
+import io.a2a.server.ServerCallContext;
+import io.a2a.server.interceptors.InvocationContext;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class RequestHandlerAttributeExtractor implements Supplier>> {
+
+ @Override
+ @SuppressWarnings("NullAway") // Null checks performed inline
+ public Function> get() {
+ return ctx -> {
+ if (ctx == null || ctx.method() == null) {
+ return Collections.emptyMap();
+ }
+
+ String method = ctx.method().getName();
+ if (method == null) {
+ return Collections.emptyMap();
+ }
+
+ Object[] parameters = ctx.parameters();
+ if (parameters == null || parameters.length < 2) {
+ return Collections.emptyMap();
+ }
+
+ switch (method) {
+ case "onMessageSend",
+ "onMessageSendStream",
+ "onCancelTask",
+ "onResubscribeToTask",
+ "getPushNotificationConfig",
+ "setPushNotificationConfig",
+ "onGetTask",
+ "listPushNotificationConfig",
+ "deletePushNotificationConfig",
+ "onListTasks" -> {
+ if (parameters[0] == null || parameters[1] == null) {
+ return Collections.emptyMap();
+ }
+
+ ServerCallContext context = (ServerCallContext) parameters[1];
+ Map attributes = new HashMap<>();
+ attributes.put("request", parameters[0].toString());
+ attributes.put("extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(",")));
+
+ String a2aMethod = (String) context.getState().get("method");
+ if (a2aMethod != null) {
+ attributes.put("a2a.method", a2aMethod);
+ }
+
+ return attributes;
+ }
+ default -> {
+ return Collections.emptyMap();
+ }
+ }
+ };
+ }
+}
diff --git a/spec/src/main/java/io/a2a/spec/Message.java b/spec/src/main/java/io/a2a/spec/Message.java
index 4383b1e02..978af4c40 100644
--- a/spec/src/main/java/io/a2a/spec/Message.java
+++ b/spec/src/main/java/io/a2a/spec/Message.java
@@ -94,6 +94,11 @@ public static Builder builder(Message message) {
return new Builder(message);
}
+ @Override
+ public String toString() {
+ return "Message{" + "role=" + role + ", parts=" + parts + ", messageId=" + messageId + ", contextId=" + contextId + ", taskId=" + taskId + ", metadata=" + metadata + ", referenceTaskIds=" + referenceTaskIds + ", extensions=" + extensions + '}';
+ }
+
/**
* Defines the role of the message sender in the conversation.
*
diff --git a/spec/src/main/java/io/a2a/spec/Task.java b/spec/src/main/java/io/a2a/spec/Task.java
index 24336dcd9..b51db72ce 100644
--- a/spec/src/main/java/io/a2a/spec/Task.java
+++ b/spec/src/main/java/io/a2a/spec/Task.java
@@ -103,6 +103,11 @@ public static Builder builder(Task task) {
return new Builder(task);
}
+ @Override
+ public String toString() {
+ return "Task{" + "id=" + id + ", contextId=" + contextId + ", status=" + status + ", artifacts=" + artifacts + ", history=" + history + ", metadata=" + metadata + '}';
+ }
+
/**
* Builder for constructing immutable {@link Task} instances.
*
diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java
index a025236f0..2338a7446 100644
--- a/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java
@@ -1,16 +1,20 @@
package io.a2a.transport.grpc.context;
+
+import java.util.Map;
+
+import io.a2a.spec.A2AMethods;
import io.grpc.Context;
/**
* Shared gRPC context keys for A2A protocol data.
- *
+ *
* These keys provide access to gRPC context information similar to
* Python's grpc.aio.ServicerContext, enabling rich context access
* in service method implementations.
*/
public final class GrpcContextKeys {
-
+
/**
* Context key for storing the X-A2A-Version header value.
* Set by server interceptors and accessed by service handlers.
@@ -36,9 +40,16 @@ public final class GrpcContextKeys {
* Context key for storing the method name being called.
* Equivalent to Python's context.method() functionality.
*/
- public static final Context.Key METHOD_NAME_KEY =
+ public static final Context.Key GRPC_METHOD_NAME_KEY =
Context.key("grpc-method-name");
+ /**
+ * Context key for storing the method name being called.
+ * Equivalent to Python's context.method() functionality.
+ */
+ public static final Context.Key METHOD_NAME_KEY =
+ Context.key("method");
+
/**
* Context key for storing the peer information.
* Provides access to client connection details.
@@ -46,6 +57,18 @@ public final class GrpcContextKeys {
public static final Context.Key PEER_INFO_KEY =
Context.key("grpc-peer-info");
+ public static final Map METHOD_MAPPING = Map.of(
+ "SendMessage", A2AMethods.SEND_MESSAGE_METHOD,
+ "SendStreamingMessage", A2AMethods.SEND_STREAMING_MESSAGE_METHOD,
+ "GetTask", A2AMethods.GET_TASK_METHOD,
+ "ListTask", A2AMethods.LIST_TASK_METHOD,
+ "CancelTask", A2AMethods.CANCEL_TASK_METHOD,
+ "SubscribeToTask", A2AMethods.SUBSCRIBE_TO_TASK_METHOD,
+ "CreateTaskPushNotification", A2AMethods.SET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD,
+ "GetTaskPushNotification", A2AMethods.GET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD,
+ "ListTaskPushNotification", A2AMethods.LIST_TASK_PUSH_NOTIFICATION_CONFIG_METHOD,
+ "DeleteTaskPushNotification", A2AMethods.DELETE_TASK_PUSH_NOTIFICATION_CONFIG_METHOD);
+
private GrpcContextKeys() {
// Utility class
}
diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java
new file mode 100644
index 000000000..7c1211f9f
--- /dev/null
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java
@@ -0,0 +1,88 @@
+package io.a2a.transport.grpc.handler;
+
+import io.grpc.Context;
+import io.a2a.server.interceptors.InvocationContext;
+import io.a2a.transport.grpc.context.GrpcContextKeys;
+import org.jspecify.annotations.NonNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Extracts OpenTelemetry trace attributes from gRPC invocation context.
+ *
+ * This extractor pulls relevant information from gRPC method calls to populate
+ * span attributes for distributed tracing.
+ */
+public class GrpcAttributeExtractor implements Supplier>> {
+
+ @Override
+ public @NonNull Function> get() {
+ return ctx -> {
+ if (ctx == null || ctx.method() == null) {
+ return Collections.emptyMap();
+ }
+
+ String method = ctx.method().getName();
+ if (method == null) {
+ return Collections.emptyMap();
+ }
+
+ Object[] parameters = ctx.parameters();
+ if (parameters == null || parameters.length == 0) {
+ return Collections.emptyMap();
+ }
+
+ switch (method) {
+ case "sendMessage",
+ "getTask",
+ "listTasks",
+ "cancelTask",
+ "createTaskPushNotificationConfig",
+ "getTaskPushNotificationConfig",
+ "listTaskPushNotificationConfig",
+ "sendStreamingMessage",
+ "subscribeToTask",
+ "deleteTaskPushNotificationConfig" -> {
+ return extractAttributes(parameters);
+ }
+ default -> {
+ return Collections.emptyMap();
+ }
+ }
+ };
+ }
+
+ /**
+ * Extracts trace attributes from method parameters and gRPC context.
+ *
+ * @param parameters the method invocation parameters
+ * @return map of attribute key-value pairs for tracing
+ */
+ private @NonNull Map extractAttributes(Object @NonNull [] parameters) {
+ Context currentContext = Context.current();
+ Map attributes = new HashMap<>();
+
+ // Add request parameter if available
+ if (parameters.length > 0 && parameters[0] != null) {
+ attributes.put("gen_ai.agent.a2a.request", parameters[0].toString());
+ }
+
+ // Add gRPC extensions header if available
+ String extensions = GrpcContextKeys.EXTENSIONS_HEADER_KEY.get();
+ if (extensions != null) {
+ attributes.put("extensions", extensions);
+ }
+
+ // Add gRPC operation name if available
+ String operationName = GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(currentContext);
+ if (operationName != null) {
+ attributes.put("gen_ai.agent.operation.name", operationName);
+ }
+
+ return attributes;
+ }
+}
diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
index 2998965ea..1f8d285b2 100644
--- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
@@ -2,6 +2,7 @@
import static io.a2a.grpc.utils.ProtoUtils.FromProto;
import static io.a2a.grpc.utils.ProtoUtils.ToProto;
+import static io.a2a.server.interceptors.Kind.SERVER;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,6 +27,7 @@
import io.a2a.server.auth.UnauthenticatedUser;
import io.a2a.server.auth.User;
import io.a2a.server.extensions.A2AExtensions;
+import io.a2a.server.interceptors.Trace;
import io.a2a.server.requesthandlers.RequestHandler;
import io.a2a.server.version.A2AVersionValidator;
import io.a2a.spec.A2AError;
@@ -59,13 +61,14 @@
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import org.jspecify.annotations.Nullable;
@Vetoed
public abstract class GrpcHandler extends A2AServiceGrpc.A2AServiceImplBase {
// Hook so testing can wait until streaming subscriptions are established.
// Without this we get intermittent failures
- private static volatile Runnable streamingSubscribedRunnable;
+ private static volatile @Nullable Runnable streamingSubscribedRunnable;
private final AtomicBoolean initialised = new AtomicBoolean(false);
@@ -76,6 +79,7 @@ public GrpcHandler() {
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void sendMessage(io.a2a.grpc.SendMessageRequest request,
StreamObserver responseObserver) {
try {
@@ -97,6 +101,7 @@ public void sendMessage(io.a2a.grpc.SendMessageRequest request,
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void getTask(io.a2a.grpc.GetTaskRequest request,
StreamObserver responseObserver) {
try {
@@ -119,6 +124,7 @@ public void getTask(io.a2a.grpc.GetTaskRequest request,
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void listTasks(io.a2a.grpc.ListTasksRequest request,
StreamObserver responseObserver) {
try {
@@ -137,6 +143,7 @@ public void listTasks(io.a2a.grpc.ListTasksRequest request,
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void cancelTask(io.a2a.grpc.CancelTaskRequest request,
StreamObserver responseObserver) {
try {
@@ -159,6 +166,7 @@ public void cancelTask(io.a2a.grpc.CancelTaskRequest request,
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void setTaskPushNotificationConfig(io.a2a.grpc.SetTaskPushNotificationConfigRequest request,
StreamObserver responseObserver) {
if (!getAgentCardInternal().capabilities().pushNotifications()) {
@@ -182,6 +190,7 @@ public void setTaskPushNotificationConfig(io.a2a.grpc.SetTaskPushNotificationCon
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationConfigRequest request,
StreamObserver responseObserver) {
if (!getAgentCardInternal().capabilities().pushNotifications()) {
@@ -205,6 +214,7 @@ public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationCon
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationConfigRequest request,
StreamObserver responseObserver) {
if (!getAgentCardInternal().capabilities().pushNotifications()) {
@@ -229,6 +239,7 @@ public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationC
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request,
StreamObserver responseObserver) {
if (!getAgentCardInternal().capabilities().streaming()) {
@@ -253,6 +264,7 @@ public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request,
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void subscribeToTask(io.a2a.grpc.SubscribeToTaskRequest request,
StreamObserver responseObserver) {
if (!getAgentCardInternal().capabilities().streaming()) {
@@ -278,12 +290,14 @@ private void convertToStreamResponse(Flow.Publisher publishe
StreamObserver responseObserver) {
CompletableFuture.runAsync(() -> {
publisher.subscribe(new Flow.Subscriber() {
- private Flow.Subscription subscription;
+ private Flow.@Nullable Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
- subscription.request(1);
+ if (this.subscription != null) {
+ this.subscription.request(1);
+ }
// Notify tests that we are subscribed
Runnable runnable = streamingSubscribedRunnable;
@@ -299,7 +313,9 @@ public void onNext(StreamingEventKind event) {
if (response.hasStatusUpdate() && response.getStatusUpdate().getFinal()) {
responseObserver.onCompleted();
} else {
- subscription.request(1);
+ if (this.subscription != null) {
+ this.subscription.request(1);
+ }
}
}
@@ -333,6 +349,7 @@ public void getExtendedAgentCard(io.a2a.grpc.GetExtendedAgentCardRequest request
}
@Override
+ @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER)
public void deleteTaskPushNotificationConfig(io.a2a.grpc.DeleteTaskPushNotificationConfigRequest request,
StreamObserver responseObserver) {
if (!getAgentCardInternal().capabilities().pushNotifications()) {
@@ -383,7 +400,7 @@ private ServerCallContext createCallContext(StreamObserver responseObserv
state.put("grpc_metadata", grpcMetadata);
}
- String methodName = GrpcContextKeys.METHOD_NAME_KEY.get(currentContext);
+ String methodName = GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(currentContext);
if (methodName != null) {
state.put("grpc_method_name", methodName);
}
@@ -549,7 +566,7 @@ public static void setStreamingSubscribedRunnable(Runnable runnable) {
*
* @return the version header value, or null if not available
*/
- private String getVersionFromContext() {
+ private @Nullable String getVersionFromContext() {
try {
return GrpcContextKeys.VERSION_HEADER_KEY.get();
} catch (Exception e) {
@@ -565,7 +582,7 @@ private String getVersionFromContext() {
*
* @return the extensions header value, or null if not available
*/
- private String getExtensionsFromContext() {
+ private @Nullable String getExtensionsFromContext() {
try {
return GrpcContextKeys.EXTENSIONS_HEADER_KEY.get();
} catch (Exception e) {
@@ -585,7 +602,7 @@ private String getExtensionsFromContext() {
* @param key the context key to retrieve
* @return the context value, or null if not available
*/
- private static T getFromContext(Context.Key key) {
+ private static @Nullable T getFromContext(Context.Key key) {
try {
return key.get();
} catch (Exception e) {
@@ -600,7 +617,7 @@ private static T getFromContext(Context.Key key) {
*
* @return the gRPC Metadata object, or null if not available
*/
- protected static io.grpc.Metadata getCurrentMetadata() {
+ protected static io.grpc.@Nullable Metadata getCurrentMetadata() {
return getFromContext(GrpcContextKeys.METADATA_KEY);
}
@@ -610,8 +627,8 @@ protected static io.grpc.Metadata getCurrentMetadata() {
*
* @return the method name, or null if not available
*/
- protected static String getCurrentMethodName() {
- return getFromContext(GrpcContextKeys.METHOD_NAME_KEY);
+ protected static @Nullable String getCurrentMethodName() {
+ return getFromContext(GrpcContextKeys.GRPC_METHOD_NAME_KEY);
}
/**
@@ -620,7 +637,7 @@ protected static String getCurrentMethodName() {
*
* @return the peer information, or null if not available
*/
- protected static String getCurrentPeerInfo() {
+ protected static @Nullable String getCurrentPeerInfo() {
return getFromContext(GrpcContextKeys.PEER_INFO_KEY);
}
}
diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java
new file mode 100644
index 000000000..0cc667b2d
--- /dev/null
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java
@@ -0,0 +1,9 @@
+/*
+ * Copyright The WildFly Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+@NullMarked
+package io.a2a.transport.grpc.handler;
+
+import org.jspecify.annotations.NullMarked;
+
diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractorTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractorTest.java
new file mode 100644
index 000000000..8b88eeecc
--- /dev/null
+++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractorTest.java
@@ -0,0 +1,235 @@
+package io.a2a.transport.grpc.handler;
+
+import io.a2a.server.interceptors.InvocationContext;
+import io.a2a.transport.grpc.context.GrpcContextKeys;
+import io.grpc.Context;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class GrpcAttributeExtractorTest {
+
+ private GrpcAttributeExtractor extractor;
+ private Context previousContext;
+
+ @BeforeEach
+ void setUp() {
+ extractor = new GrpcAttributeExtractor();
+ // Save the current context to restore it later
+ previousContext = Context.current();
+ }
+
+ @AfterEach
+ void tearDown() {
+ // Restore the previous context
+ if (previousContext != null) {
+ previousContext.attach();
+ }
+ }
+
+ @Test
+ void testExtractAttributes_SendMessage_Success() throws Exception {
+ // Create a mock method
+ Method method = TestService.class.getMethod("sendMessage", Object.class);
+ Object[] parameters = new Object[]{"test-request"};
+
+ // Set up gRPC context with test values
+ Context ctx = Context.current()
+ .withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, "a2a.SendMessage")
+ .withValue(GrpcContextKeys.EXTENSIONS_HEADER_KEY, "ext1,ext2");
+
+ Context previous = ctx.attach();
+ try {
+ InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters);
+ Function> function = extractor.get();
+ Map attributes = function.apply(invocationCtx);
+
+ assertNotNull(attributes);
+ assertEquals(3, attributes.size());
+ assertEquals("test-request", attributes.get("gen_ai.agent.a2a.request"));
+ assertEquals("ext1,ext2", attributes.get("extensions"));
+ assertEquals("a2a.SendMessage", attributes.get("gen_ai.agent.operation.name"));
+ } finally {
+ ctx.detach(previous);
+ }
+ }
+
+ @Test
+ void testExtractAttributes_NullContext() {
+ Function> function = extractor.get();
+ Map attributes = function.apply(null);
+
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
+ @Test
+ void testExtractAttributes_NullMethod() throws Exception {
+ Object[] parameters = new Object[]{"test-request"};
+ assertThrows(IllegalArgumentException.class, () -> new InvocationContext(new TestService(), null, parameters));
+ }
+
+ @Test
+ void testExtractAttributes_NullParameters() throws Exception {
+ Method method = TestService.class.getMethod("sendMessage", Object.class);
+ InvocationContext invocationCtx = new InvocationContext(new TestService(), method, null);
+
+ Function> function = extractor.get();
+ Map attributes = function.apply(invocationCtx);
+
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
+ @Test
+ void testExtractAttributes_EmptyParameters() throws Exception {
+ Method method = TestService.class.getMethod("sendMessage", Object.class);
+ Object[] parameters = new Object[]{};
+ InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters);
+
+ Function> function = extractor.get();
+ Map attributes = function.apply(invocationCtx);
+
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
+ @Test
+ void testExtractAttributes_NullParameter() throws Exception {
+ Method method = TestService.class.getMethod("sendMessage", Object.class);
+ Object[] parameters = new Object[]{null};
+
+ Context ctx = Context.current()
+ .withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, "a2a.SendMessage");
+
+ Context previous = ctx.attach();
+ try {
+ InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters);
+ Function> function = extractor.get();
+ Map attributes = function.apply(invocationCtx);
+
+ assertNotNull(attributes);
+ // Should have operation name but not request (since parameter is null)
+ assertEquals(1, attributes.size());
+ assertEquals("a2a.SendMessage", attributes.get("gen_ai.agent.operation.name"));
+ } finally {
+ ctx.detach(previous);
+ }
+ }
+
+ @Test
+ void testExtractAttributes_NoGrpcContextKeys() throws Exception {
+ Method method = TestService.class.getMethod("sendMessage", Object.class);
+ Object[] parameters = new Object[]{"test-request"};
+
+ // Don't set any context values
+ InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters);
+ Function> function = extractor.get();
+ Map attributes = function.apply(invocationCtx);
+
+ assertNotNull(attributes);
+ assertEquals(1, attributes.size());
+ assertEquals("test-request", attributes.get("gen_ai.agent.a2a.request"));
+ assertFalse(attributes.containsKey("extensions"));
+ assertFalse(attributes.containsKey("gen_ai.agent.operation.name"));
+ }
+
+ @Test
+ void testExtractAttributes_UnknownMethod() throws Exception {
+ Method method = TestService.class.getMethod("unknownMethod");
+ Object[] parameters = new Object[]{};
+
+ InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters);
+ Function> function = extractor.get();
+ Map attributes = function.apply(invocationCtx);
+
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
+ @Test
+ void testExtractAttributes_AllSupportedMethods() throws Exception {
+ String[] supportedMethods = {
+ "sendMessage",
+ "getTask",
+ "listTasks",
+ "cancelTask",
+ "createTaskPushNotificationConfig",
+ "getTaskPushNotificationConfig",
+ "listTaskPushNotificationConfig",
+ "sendStreamingMessage",
+ "subscribeToTask",
+ "deleteTaskPushNotificationConfig"
+ };
+
+ for (String methodName : supportedMethods) {
+ Method method = TestService.class.getMethod(methodName, Object.class);
+ Object[] parameters = new Object[]{"request-" + methodName};
+
+ Context ctx = Context.current()
+ .withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, "a2a." + methodName)
+ .withValue(GrpcContextKeys.EXTENSIONS_HEADER_KEY, "ext1");
+
+ Context previous = ctx.attach();
+ try {
+ InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters);
+ Function> function = extractor.get();
+ Map attributes = function.apply(invocationCtx);
+
+ assertNotNull(attributes, "Attributes should not be null for method: " + methodName);
+ assertFalse(attributes.isEmpty(), "Attributes should not be empty for method: " + methodName);
+ assertEquals("request-" + methodName, attributes.get("gen_ai.agent.a2a.request"),
+ "Request attribute should match for method: " + methodName);
+ assertEquals("ext1", attributes.get("extensions"),
+ "Extensions should match for method: " + methodName);
+ assertEquals("a2a." + methodName, attributes.get("gen_ai.agent.operation.name"),
+ "Operation name should match for method: " + methodName);
+ } finally {
+ ctx.detach(previous);
+ }
+ }
+ }
+
+ @Test
+ void testExtractAttributes_OnlyExtensions() throws Exception {
+ Method method = TestService.class.getMethod("sendMessage", Object.class);
+ Object[] parameters = new Object[]{null};
+
+ Context ctx = Context.current()
+ .withValue(GrpcContextKeys.EXTENSIONS_HEADER_KEY, "ext1,ext2,ext3");
+
+ Context previous = ctx.attach();
+ try {
+ InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters);
+ Function> function = extractor.get();
+ Map attributes = function.apply(invocationCtx);
+
+ assertNotNull(attributes);
+ assertEquals(1, attributes.size());
+ assertEquals("ext1,ext2,ext3", attributes.get("extensions"));
+ } finally {
+ ctx.detach(previous);
+ }
+ }
+
+ // Test service class with all supported methods
+ public static class TestService {
+ public void sendMessage(Object request) {}
+ public void getTask(Object request) {}
+ public void listTasks(Object request) {}
+ public void cancelTask(Object request) {}
+ public void createTaskPushNotificationConfig(Object request) {}
+ public void getTaskPushNotificationConfig(Object request) {}
+ public void listTaskPushNotificationConfig(Object request) {}
+ public void sendStreamingMessage(Object request) {}
+ public void subscribeToTask(Object request) {}
+ public void deleteTaskPushNotificationConfig(Object request) {}
+ public void unknownMethod() {}
+ }
+}
diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCAttributeExtractor.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCAttributeExtractor.java
new file mode 100644
index 000000000..485f8adba
--- /dev/null
+++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCAttributeExtractor.java
@@ -0,0 +1,53 @@
+package io.a2a.transport.jsonrpc.handler;
+
+import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY;
+
+import io.a2a.server.ServerCallContext;
+import io.a2a.server.interceptors.InvocationContext;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class JSONRPCAttributeExtractor implements Supplier>> {
+
+ @Override
+ public Function> get() {
+ return ctx -> {
+ String method = ctx.method().getName();
+ Object[] parameters = ctx.parameters() == null ? new Object[]{} : ctx.parameters();
+ if( ctx.parameters() == null || ctx.parameters().length < 2) {
+ throw new IllegalArgumentException("wrong parameters passed");
+ }
+ switch (method) {
+ case "onMessageSend",
+ "onMessageSendStream",
+ "onCancelTask",
+ "onResubscribeToTask",
+ "getPushNotificationConfig",
+ "setPushNotificationConfig",
+ "onGetTask",
+ "listPushNotificationConfig",
+ "deletePushNotificationConfig",
+ "onListTasks" -> {
+ ServerCallContext context = (ServerCallContext) parameters[1];
+ Map attributes = new HashMap<>();
+ attributes.put("gen_ai.agent.a2a.request", parameters[0].toString());
+ attributes.put("gen_ai.agent.a2a.extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(",")));
+
+ String operationName = (String) context.getState().get(METHOD_NAME_KEY);
+ if (operationName != null) {
+ attributes.put("gen_ai.agent.operation.name", operationName);
+ }
+
+ return attributes;
+ }
+ default -> {
+ return Collections.emptyMap();
+ }
+ }
+ };
+ }
+}
diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java
index ca8149099..72ad40acc 100644
--- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java
+++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java
@@ -1,5 +1,6 @@
package io.a2a.transport.jsonrpc.handler;
+import static io.a2a.server.interceptors.Kind.SERVER;
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
import java.util.concurrent.CompletableFuture;
@@ -37,13 +38,13 @@
import io.a2a.server.PublicAgentCard;
import io.a2a.server.ServerCallContext;
import io.a2a.server.extensions.A2AExtensions;
+import io.a2a.server.interceptors.Trace;
import io.a2a.server.requesthandlers.RequestHandler;
import io.a2a.server.util.async.Internal;
import io.a2a.server.version.A2AVersionValidator;
import io.a2a.spec.A2AError;
import io.a2a.spec.AgentCard;
import io.a2a.spec.ExtendedCardNotConfiguredError;
-import io.a2a.spec.ExtensionSupportRequiredError;
import io.a2a.spec.EventKind;
import io.a2a.spec.InternalError;
import io.a2a.spec.InvalidRequestError;
@@ -53,7 +54,6 @@
import io.a2a.spec.Task;
import io.a2a.spec.TaskNotFoundError;
import io.a2a.spec.TaskPushNotificationConfig;
-import io.a2a.spec.VersionNotSupportedError;
import mutiny.zero.ZeroPublisher;
import org.jspecify.annotations.Nullable;
@@ -98,6 +98,7 @@ public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, RequestHandler reque
this(agentCard, null, requestHandler, executor);
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallContext context) {
try {
A2AVersionValidator.validateProtocolVersion(agentCard, context);
@@ -111,7 +112,7 @@ public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallC
}
}
-
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public Flow.Publisher onMessageSendStream(
SendStreamingMessageRequest request, ServerCallContext context) {
if (!agentCard.capabilities().streaming()) {
@@ -136,6 +137,7 @@ public Flow.Publisher onMessageSendStream(
}
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public CancelTaskResponse onCancelTask(CancelTaskRequest request, ServerCallContext context) {
try {
Task task = requestHandler.onCancelTask(request.getParams(), context);
@@ -150,6 +152,7 @@ public CancelTaskResponse onCancelTask(CancelTaskRequest request, ServerCallCont
}
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public Flow.Publisher onSubscribeToTask(
SubscribeToTaskRequest request, ServerCallContext context) {
if (!agentCard.capabilities().streaming()) {
@@ -158,7 +161,6 @@ public Flow.Publisher onSubscribeToTask(
request.getId(),
new InvalidRequestError("Streaming is not supported by the agent")));
}
-
try {
Flow.Publisher publisher =
requestHandler.onResubscribeToTask(request.getParams(), context);
@@ -172,6 +174,7 @@ public Flow.Publisher onSubscribeToTask(
}
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public GetTaskPushNotificationConfigResponse getPushNotificationConfig(
GetTaskPushNotificationConfigRequest request, ServerCallContext context) {
if (!agentCard.capabilities().pushNotifications()) {
@@ -189,6 +192,7 @@ public GetTaskPushNotificationConfigResponse getPushNotificationConfig(
}
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public SetTaskPushNotificationConfigResponse setPushNotificationConfig(
SetTaskPushNotificationConfigRequest request, ServerCallContext context) {
if (!agentCard.capabilities().pushNotifications()) {
@@ -206,6 +210,7 @@ public SetTaskPushNotificationConfigResponse setPushNotificationConfig(
}
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public GetTaskResponse onGetTask(GetTaskRequest request, ServerCallContext context) {
try {
Task task = requestHandler.onGetTask(request.getParams(), context);
@@ -217,6 +222,7 @@ public GetTaskResponse onGetTask(GetTaskRequest request, ServerCallContext conte
}
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public ListTasksResponse onListTasks(ListTasksRequest request, ServerCallContext context) {
try {
ListTasksResult result = requestHandler.onListTasks(request.getParams(), context);
@@ -228,6 +234,7 @@ public ListTasksResponse onListTasks(ListTasksRequest request, ServerCallContext
}
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public ListTaskPushNotificationConfigResponse listPushNotificationConfig(
ListTaskPushNotificationConfigRequest request, ServerCallContext context) {
if ( !agentCard.capabilities().pushNotifications()) {
@@ -245,6 +252,7 @@ public ListTaskPushNotificationConfigResponse listPushNotificationConfig(
}
}
+ @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER)
public DeleteTaskPushNotificationConfigResponse deletePushNotificationConfig(
DeleteTaskPushNotificationConfigRequest request, ServerCallContext context) {
if ( !agentCard.capabilities().pushNotifications()) {
diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java
new file mode 100644
index 000000000..03427fdd6
--- /dev/null
+++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java
@@ -0,0 +1,100 @@
+package io.a2a.transport.rest.handler;
+
+import static io.a2a.transport.rest.context.RestContextKeys.METHOD_NAME_KEY;
+
+import io.a2a.server.ServerCallContext;
+import io.a2a.server.interceptors.InvocationContext;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.jspecify.annotations.Nullable;
+
+public class RestAttributeExtractor implements Supplier>> {
+
+ @Override
+ public Function> get() {
+ return ctx -> {
+ String method = ctx.method().getName();
+ Object[] parameters = ctx.parameters() == null ? new Object[]{} : ctx.parameters();
+ if( ctx.parameters() == null || ctx.parameters().length < 2) {
+ throw new IllegalArgumentException("wrong parameters passed");
+ }
+ switch (method) {
+ case "sendMessage",
+ "sendStreamingMessage"-> {
+ ServerCallContext context = (ServerCallContext) parameters[2];
+ Map result = new HashMap<>();
+ putIfNotNull(result, "gen_ai.agent.a2a.request", (String) parameters[0]);
+ result.putAll(processServerCallContext(context));
+ return result;
+ }
+ case "setTaskPushNotificationConfiguration" -> {
+ ServerCallContext context = (ServerCallContext) parameters[3];
+ Map result = new HashMap<>();
+ putIfNotNull(result, "gen_ai.agent.a2a.taskId", (String) parameters[0]);
+ putIfNotNull(result, "gen_ai.agent.a2a.request", (String) parameters[1]);
+ result.putAll(processServerCallContext(context));
+ return result;
+ }
+ case "cancelTask",
+ "subscribeToTask",
+ "listTaskPushNotificationConfigurations" -> {
+ ServerCallContext context = (ServerCallContext) parameters[2];
+ Map result = new HashMap<>();
+ putIfNotNull(result, "gen_ai.agent.a2a.taskId", (String) parameters[0]);
+ result.putAll(processServerCallContext(context));
+ return result;
+ }
+ case "getTask" -> {
+ ServerCallContext context = (ServerCallContext) parameters[3];
+ Map result = new HashMap<>();
+ putIfNotNull(result, "gen_ai.agent.a2a.taskId", (String) parameters[0]);
+ putIfNotNull(result, "gen_ai.agent.a2a.historyLength", "" + (int) parameters[1]);
+ result.putAll(processServerCallContext(context));
+ return result;
+ }
+ case "getTaskPushNotificationConfiguration",
+ "deleteTaskPushNotificationConfiguration" -> {
+ ServerCallContext context = (ServerCallContext) parameters[3];
+ Map result = new HashMap<>();
+ putIfNotNull(result, "gen_ai.agent.a2a.taskId", (String) parameters[0]);
+ putIfNotNull(result, "gen_ai.agent.a2a.configId", (String) parameters[1]);
+ result.putAll(processServerCallContext(context));
+ return result;
+ }
+ case "listTasks" -> {
+ ServerCallContext context = (ServerCallContext) parameters[7];
+ Map result = new HashMap<>();
+ putIfNotNull(result, "gen_ai.agent.a2a.contextId", (String) parameters[0]);
+ putIfNotNull(result, "gen_ai.agent.a2a.status", (String) parameters[1]);
+ result.putAll(processServerCallContext(context));
+ return result;
+ }
+ default -> {
+ return Collections.emptyMap();
+ }
+ }
+ };
+ }
+
+ private Map processServerCallContext(ServerCallContext context) {
+ Map attributes = new HashMap<>();
+ attributes.put("gen_ai.agent.a2a.extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(",")));
+
+ String operationName = (String)context.getState().get(METHOD_NAME_KEY);
+ if (operationName != null) {
+ attributes.put("gen_ai.agent.a2a.operation.name", operationName);
+ }
+
+ return attributes;
+ }
+
+ private void putIfNotNull(Map map, String key, @Nullable String value) {
+ if (value != null) {
+ map.put(key, value);
+ }
+ }
+}
diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java
index 489bf909b..70a603476 100644
--- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java
+++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java
@@ -1,5 +1,6 @@
package io.a2a.transport.rest.handler;
+import static io.a2a.server.interceptors.Kind.SERVER;
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
import static io.a2a.spec.A2AErrorCodes.JSON_PARSE_ERROR_CODE;
@@ -29,6 +30,7 @@
import io.a2a.server.PublicAgentCard;
import io.a2a.server.ServerCallContext;
import io.a2a.server.extensions.A2AExtensions;
+import io.a2a.server.interceptors.Trace;
import io.a2a.server.requesthandlers.RequestHandler;
import io.a2a.server.version.A2AVersionValidator;
import io.a2a.server.util.async.Internal;
@@ -105,7 +107,9 @@ public RestHandler(AgentCard agentCard, RequestHandler requestHandler, Executor
this.executor = executor;
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse sendMessage(String body, String tenant, ServerCallContext context) {
+
try {
A2AVersionValidator.validateProtocolVersion(agentCard, context);
A2AExtensions.validateRequiredExtensions(agentCard, context);
@@ -121,6 +125,7 @@ public HTTPRestResponse sendMessage(String body, String tenant, ServerCallContex
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse sendStreamingMessage(String body, String tenant, ServerCallContext context) {
try {
if (!agentCard.capabilities().streaming()) {
@@ -140,6 +145,7 @@ public HTTPRestResponse sendStreamingMessage(String body, String tenant, ServerC
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse cancelTask(String taskId, String tenant, ServerCallContext context) {
try {
if (taskId == null || taskId.isEmpty()) {
@@ -158,6 +164,7 @@ public HTTPRestResponse cancelTask(String taskId, String tenant, ServerCallConte
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse setTaskPushNotificationConfiguration(String taskId, String body, String tenant, ServerCallContext context) {
try {
if (!agentCard.capabilities().pushNotifications()) {
@@ -175,6 +182,7 @@ public HTTPRestResponse setTaskPushNotificationConfiguration(String taskId, Stri
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse subscribeToTask(String taskId, String tenant, ServerCallContext context) {
try {
if (!agentCard.capabilities().streaming()) {
@@ -190,6 +198,7 @@ public HTTPRestResponse subscribeToTask(String taskId, String tenant, ServerCall
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse getTask(String taskId, @Nullable Integer historyLength, String tenant, ServerCallContext context) {
try {
TaskQueryParams params = new TaskQueryParams(taskId, historyLength, tenant);
@@ -205,6 +214,7 @@ public HTTPRestResponse getTask(String taskId, @Nullable Integer historyLength,
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String status,
@Nullable Integer pageSize, @Nullable String pageToken,
@Nullable Integer historyLength, @Nullable String lastUpdatedAfter,
@@ -253,6 +263,7 @@ public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String s
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse getTaskPushNotificationConfiguration(String taskId, @Nullable String configId, String tenant, ServerCallContext context) {
try {
if (!agentCard.capabilities().pushNotifications()) {
@@ -268,6 +279,7 @@ public HTTPRestResponse getTaskPushNotificationConfiguration(String taskId, @Nul
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse listTaskPushNotificationConfigurations(String taskId, int pageSize, String pageToken, String tenant, ServerCallContext context) {
try {
if (!agentCard.capabilities().pushNotifications()) {
@@ -283,6 +295,7 @@ public HTTPRestResponse listTaskPushNotificationConfigurations(String taskId, in
}
}
+ @Trace(extractor = RestAttributeExtractor.class, kind = SERVER)
public HTTPRestResponse deleteTaskPushNotificationConfiguration(String taskId, String configId, String tenant, ServerCallContext context) {
try {
if (!agentCard.capabilities().pushNotifications()) {