diff --git a/.gitignore b/.gitignore index dcc2f1cb8..e96770c17 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ pom.xml.versionsBackup release.properties .flattened-pom.xml +#Claude +CLAUDE.md + # Eclipse .project .classpath @@ -20,6 +23,7 @@ bin/ # NetBeans nb-configuration.xml +nbactions.xml # Visual Studio Code .vscode diff --git a/boms/extras/pom.xml b/boms/extras/pom.xml index c06575595..de35306a8 100644 --- a/boms/extras/pom.xml +++ b/boms/extras/pom.xml @@ -34,6 +34,11 @@ a2a-java-extras-common ${project.version} + + ${project.groupId} + a2a-java-sdk-opentelemetry + ${project.version} + ${project.groupId} a2a-java-extras-task-store-database-jpa diff --git a/boms/extras/src/it/extras-usage-test/pom.xml b/boms/extras/src/it/extras-usage-test/pom.xml index 5beeb50ea..356eccb58 100644 --- a/boms/extras/src/it/extras-usage-test/pom.xml +++ b/boms/extras/src/it/extras-usage-test/pom.xml @@ -44,6 +44,10 @@ io.github.a2asdk a2a-java-extras-common + + io.github.a2asdk + a2a-java-sdk-opentelemetry + io.github.a2asdk a2a-java-extras-task-store-database-jpa diff --git a/client/base/src/main/java/io/a2a/client/ClientBuilder.java b/client/base/src/main/java/io/a2a/client/ClientBuilder.java index c8d2ab6be..b06ab96a0 100644 --- a/client/base/src/main/java/io/a2a/client/ClientBuilder.java +++ b/client/base/src/main/java/io/a2a/client/ClientBuilder.java @@ -6,14 +6,17 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import io.a2a.client.config.ClientConfig; import io.a2a.client.transport.spi.ClientTransport; import io.a2a.client.transport.spi.ClientTransportConfig; import io.a2a.client.transport.spi.ClientTransportConfigBuilder; import io.a2a.client.transport.spi.ClientTransportProvider; +import io.a2a.client.transport.spi.ClientTransportWrapper; import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; import io.a2a.spec.AgentInterface; @@ -21,6 +24,9 @@ import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Builder for creating instances of {@link Client} to communicate with A2A agents. *

@@ -96,6 +102,7 @@ public class ClientBuilder { private static final Map>> transportProviderRegistry = new HashMap<>(); private static final Map, String> transportProtocolMapping = new HashMap<>(); + private static final Logger LOGGER = LoggerFactory.getLogger(ClientBuilder.class); static { ServiceLoader loader = ServiceLoader.load(ClientTransportProvider.class); @@ -108,7 +115,8 @@ public class ClientBuilder { private final AgentCard agentCard; private final List> consumers = new ArrayList<>(); - private @Nullable Consumer streamErrorHandler; + private @Nullable + Consumer streamErrorHandler; private ClientConfig clientConfig = new ClientConfig.Builder().build(); private final Map, ClientTransportConfig> clientTransports = new LinkedHashMap<>(); @@ -318,7 +326,7 @@ private ClientTransport buildClientTransport() throws A2AClientException { throw new A2AClientException("Missing required TransportConfig for " + agentInterface.protocolBinding()); } - return clientTransportProvider.create(clientTransportConfig, agentCard, agentInterface); + return wrap(clientTransportProvider.create(clientTransportConfig, agentCard, agentInterface), clientTransportConfig); } private Map getServerPreferredTransports() throws A2AClientException { @@ -373,10 +381,50 @@ private AgentInterface findBestClientTransport() throws A2AClientException { if (transportProtocol == null || transportUrl == null) { throw new A2AClientException("No compatible transport found"); } - if (! transportProviderRegistry.containsKey(transportProtocol)) { + if (!transportProviderRegistry.containsKey(transportProtocol)) { throw new A2AClientException("No client available for " + transportProtocol); } return new AgentInterface(transportProtocol, transportUrl); } + + /** + * Wraps the transport with all available transport wrappers discovered via ServiceLoader. + * Wrappers are applied in priority order (highest priority first). + * + * @param transport the base transport to wrap + * @param clientTransportConfig the transport configuration + * @return the wrapped transport (or original if no wrappers are available/applicable) + */ + private ClientTransport wrap(ClientTransport transport, ClientTransportConfig clientTransportConfig) { + ServiceLoader wrapperLoader = ServiceLoader.load(ClientTransportWrapper.class); + + // Collect all wrappers and sort by natural order (uses Comparable implementation) + List wrappers = wrapperLoader.stream().map(Provider::get) + .sorted() + .collect(Collectors.toList()); + + if (wrappers.isEmpty()) { + LOGGER.debug("No client transport wrappers found via ServiceLoader"); + return transport; + } + + // Apply wrappers in priority order + ClientTransport wrapped = transport; + for (ClientTransportWrapper wrapper : wrappers) { + try { + ClientTransport newWrapped = wrapper.wrap(wrapped, clientTransportConfig); + if (newWrapped != wrapped) { + LOGGER.debug("Applied transport wrapper: {} (priority: {})", + wrapper.getClass().getName(), wrapper.priority()); + } + wrapped = newWrapped; + } catch (Exception e) { + LOGGER.warn("Failed to apply transport wrapper {}: {}", + wrapper.getClass().getName(), e.getMessage(), e); + } + } + + return wrapped; + } } diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfig.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfig.java index 241541a32..e78c67271 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfig.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfig.java @@ -48,7 +48,7 @@ * @see A2AHttpClient * @see io.a2a.client.http.JdkA2AHttpClient */ -public class RestTransportConfig extends ClientTransportConfig { +public class RestTransportConfig extends ClientTransportConfig { private final @Nullable A2AHttpClient httpClient; diff --git a/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportConfig.java b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportConfig.java index 9c05fac59..1b4af677c 100644 --- a/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportConfig.java +++ b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportConfig.java @@ -1,7 +1,10 @@ package io.a2a.client.transport.spi; import java.util.ArrayList; + +import java.util.HashMap; import java.util.List; +import java.util.Map; import io.a2a.client.transport.spi.interceptors.ClientCallInterceptor; @@ -35,6 +38,7 @@ public abstract class ClientTransportConfig { protected List interceptors = new ArrayList<>(); + protected Map parameters = new HashMap<>(); /** * Set the list of request/response interceptors. @@ -63,4 +67,12 @@ public void setInterceptors(List interceptors) { public List getInterceptors() { return java.util.Collections.unmodifiableList(interceptors); } + + public void setParameters(Map parameters) { + this.parameters = new HashMap<>(parameters); + } + + public Map getParameters() { + return parameters; + } } diff --git a/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportWrapper.java b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportWrapper.java new file mode 100644 index 000000000..25dba33b9 --- /dev/null +++ b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportWrapper.java @@ -0,0 +1,81 @@ +package io.a2a.client.transport.spi; + +/** + * Service provider interface for wrapping client transports with additional functionality. + * Implementations can add cross-cutting concerns like tracing, metrics, logging, etc. + * + *

Wrappers are discovered via Java's ServiceLoader mechanism. To register a wrapper, + * create a file {@code META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper} + * containing the fully qualified class name of your implementation. + * + *

Wrappers are sorted by priority in descending order (highest priority first). + * This interface implements {@link Comparable} to enable natural sorting. + * + *

Example implementation: + *

{@code
+ * public class TracingWrapper implements ClientTransportWrapper {
+ *     @Override
+ *     public ClientTransport wrap(ClientTransport transport, ClientTransportConfig config) {
+ *         if (config.getParameters().containsKey("tracer")) {
+ *             return new TracingTransport(transport, (Tracer) config.getParameters().get("tracer"));
+ *         }
+ *         return transport;
+ *     }
+ *
+ *     @Override
+ *     public int priority() {
+ *         return 100; // Higher priority = wraps earlier (outermost)
+ *     }
+ * }
+ * }
+ */ +public interface ClientTransportWrapper extends Comparable { + + /** + * Wraps the given transport with additional functionality. + * + *

Implementations should check the configuration to determine if they should + * actually wrap the transport. If the wrapper is not applicable (e.g., required + * configuration is missing), return the original transport unchanged. + * + * @param transport the transport to wrap + * @param config the transport configuration, may contain wrapper-specific parameters + * @return the wrapped transport, or the original if wrapping is not applicable + */ + ClientTransport wrap(ClientTransport transport, ClientTransportConfig config); + + /** + * Returns the priority of this wrapper. Higher priority wrappers are applied first + * (wrap the transport earlier, resulting in being the outermost wrapper). + * + *

Default priority is 0. Suggested ranges: + *

+ * + * @return the priority value, higher values = higher priority + */ + default int priority() { + return 0; + } + + /** + * Compares this wrapper with another based on priority. + * Returns a negative integer, zero, or a positive integer as this wrapper + * has higher priority than, equal to, or lower priority than the specified wrapper. + * + *

Note: This comparison is reversed (higher priority comes first) to enable + * natural sorting in descending priority order. + * + * @param other the wrapper to compare to + * @return negative if this has higher priority, positive if lower, zero if equal + */ + @Override + default int compareTo(ClientTransportWrapper other) { + // Reverse comparison: higher priority should come first + return Integer.compare(other.priority(), this.priority()); + } +} diff --git a/examples/helloworld/client/README.md b/examples/helloworld/client/README.md index ac01c890f..7f484607c 100644 --- a/examples/helloworld/client/README.md +++ b/examples/helloworld/client/README.md @@ -41,9 +41,9 @@ The Python A2A server is part of the [a2a-samples](https://github.com/google-a2a The server will start running on `http://localhost:9999`. -## Run the Java A2A Client with JBang +## Run the Java A2A Client -The Java client can be run using JBang, which allows you to run Java source files directly without any manual compilation. +The Java client can be run using either Maven or JBang. ### Build the A2A Java SDK @@ -54,9 +54,23 @@ cd /path/to/a2a-java mvn clean install ``` -### Using the JBang script +### Option 1: Using Maven (Recommended) -A JBang script is provided in the example directory to make running the client easy: +Run the client using Maven's exec plugin: + +```bash +cd examples/helloworld/client +mvn exec:java +``` + +To enable OpenTelemetry with Maven: +```bash +mvn exec:java -Dopentelemetry=true +``` + +### Option 2: Using JBang + +A JBang script is provided for running the client without Maven: 1. Make sure you have JBang installed. If not, follow the [JBang installation guide](https://www.jbang.dev/documentation/guide/latest/installation.html). @@ -70,20 +84,99 @@ A JBang script is provided in the example directory to make running the client e jbang HelloWorldRunner.java ``` -This script automatically handles the dependencies and sources for you. +To enable OpenTelemetry with JBang: +```bash +jbang -Dopentelemetry=true HelloWorldRunner.java +``` ## What the Example Does The Java client (`HelloWorldClient.java`) performs the following actions: 1. Fetches the server's public agent card -2. Fetches the server's extended agent card +2. Fetches the server's extended agent card 3. Creates a client using the extended agent card that connects to the Python server at `http://localhost:9999`. 4. Sends a regular message asking "how much is 10 USD in INR?". 5. Prints the server's response. 6. Sends the same message as a streaming request. 7. Prints each chunk of the server's streaming response as it arrives. +## Enable OpenTelemetry (Optional) + +The client includes support for distributed tracing with OpenTelemetry. To enable it: + +### Prerequisites + +**IMPORTANT**: The client expects an OpenTelemetry collector to be ready and accepting traces. You have two options: + +#### Option 1: Use the Java Server Example (Recommended) + +Instead of the Python server, use the Java server example which has built-in OpenTelemetry support: + +1. **Start the Java server with OpenTelemetry enabled**: + ```bash + cd examples/helloworld/server + mvn quarkus:dev -Popentelemetry + ``` + This will: + - Start the server at `http://localhost:9999` + - Launch Grafana at `http://localhost:3001` + - Start OTLP collectors on ports 5317 (gRPC) and 5318 (HTTP) + +2. **Run the client with OpenTelemetry**: + + Using Maven (from `examples/helloworld/client`): + ```bash + mvn exec:java -Dopentelemetry=true + ``` + + Or using JBang (from `examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/`): + ```bash + jbang -Dopentelemetry=true HelloWorldRunner.java + ``` + +3. **View traces in Grafana**: + - Open `http://localhost:3001` (credentials: admin/admin) + - Go to "Explore" → select "Tempo" data source + - View distributed traces showing the full request flow from client to server + +#### Option 2: Use External OpenTelemetry Collector + +If you want to use the Python server with OpenTelemetry: + +1. **Start an OpenTelemetry collector** on port 5317 (e.g., using Docker): + ```bash + docker run -p 5317:4317 otel/opentelemetry-collector + ``` + +2. **Run the Python server** as described above + +3. **Run the client with OpenTelemetry**: + ```bash + jbang -Dopentelemetry=true HelloWorldRunner.java + ``` + +### What Gets Traced + +When OpenTelemetry is enabled, the client traces: +- Agent card fetching (public and extended) +- Message sending (blocking and streaming) +- Task operations (get, cancel, list) +- Push notification configuration operations +- Connection and transport layer operations + +Client traces are automatically linked with server traces (when using the Java server), providing end-to-end visibility of the entire A2A protocol flow. + +### Configuration + +The client is configured to send traces to `http://localhost:5317` (OTLP gRPC endpoint). To use a different endpoint, modify the `initOpenTelemetry()` method in `HelloWorldClient.java`: + +```java +OtlpGrpcSpanExporter.builder() + .setEndpoint("http://your-collector:4317") + .build() +``` + ## Notes - Make sure the Python server is running before starting the Java client. diff --git a/examples/helloworld/client/pom.xml b/examples/helloworld/client/pom.xml index 0ac460bc5..97243fb51 100644 --- a/examples/helloworld/client/pom.xml +++ b/examples/helloworld/client/pom.xml @@ -12,7 +12,7 @@ a2a-java-sdk-examples-client - Java SDK A2A Examples + Java SDK A2A Examples - HelloWorld Client Examples for the Java SDK for the Agent2Agent Protocol (A2A) @@ -27,7 +27,22 @@ io.github.a2asdk a2a-java-sdk-jsonrpc-common - ${project.version} + + + io.github.a2asdk + a2a-java-sdk-opentelemetry + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-exporter-otlp + + + io.opentelemetry + opentelemetry-exporter-logging @@ -61,6 +76,20 @@ + + org.codehaus.mojo + exec-maven-plugin + 3.6.2 + + io.a2a.examples.helloworld.HelloWorldClient + + + opentelemetry + ${opentelemetry} + + + + \ No newline at end of file diff --git a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java index c3b1cb473..e6e18891c 100644 --- a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java +++ b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java @@ -21,6 +21,11 @@ import io.a2a.spec.Message; import io.a2a.spec.Part; import io.a2a.spec.TextPart; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; /** * A simple example of using the A2A Java SDK to communicate with an A2A server. @@ -32,6 +37,7 @@ public class HelloWorldClient { private static final String MESSAGE_TEXT = "how much is 10 USD in INR?"; public static void main(String[] args) { + OpenTelemetrySdk openTelemetrySdk = null; try { AgentCard finalAgentCard = null; AgentCard publicAgentCard = new A2ACardResolver("http://localhost:9999").getAgentCard(); @@ -81,24 +87,36 @@ public static void main(String[] args) { messageResponse.completeExceptionally(error); }; + JSONRPCTransportConfig transportConfig = new JSONRPCTransportConfig(); + if (Boolean.getBoolean("opentelemetry")) { + openTelemetrySdk = initOpenTelemetry(); + transportConfig.setParameters(Map.of("io.a2a.extras.opentelemetry.Tracer", + openTelemetrySdk.getTracer("helloworld-client"))); + } Client client = Client .builder(finalAgentCard) .addConsumers(consumers) .streamingErrorHandler(streamingErrorHandler) - .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig()) + .withTransport(JSONRPCTransport.class, transportConfig) .build(); Message message = A2A.toUserMessage(MESSAGE_TEXT); // the message ID will be automatically generated for you - - System.out.println("Sending message: " + MESSAGE_TEXT); - client.sendMessage(message); - System.out.println("Message sent successfully. Responses will be handled by the configured consumers."); - try { + System.out.println("Sending message: " + MESSAGE_TEXT); + client.sendMessage(message); + System.out.println("Message sent successfully. Responses will be handled by the configured consumers."); + String responseText = messageResponse.get(); System.out.println("Response: " + responseText); } catch (Exception e) { System.err.println("Failed to get response: " + e.getMessage()); + } finally { + // Ensure OpenTelemetry SDK is properly shut down to export all pending spans + if (openTelemetrySdk != null) { + System.out.println("Shutting down OpenTelemetry SDK..."); + openTelemetrySdk.close(); + System.out.println("OpenTelemetry SDK shutdown complete."); + } } } catch (Exception e) { System.err.println("An error occurred: " + e.getMessage()); @@ -106,4 +124,21 @@ public static void main(String[] args) { } } -} \ No newline at end of file + static OpenTelemetrySdk initOpenTelemetry() { + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder( + OtlpGrpcSpanExporter.builder() + .setEndpoint("http://localhost:5317") + .build() + ).build()) + .setResource(Resource.getDefault().toBuilder() + .put("service.version", "1.0") + .put("service.name", "helloworld-client") + .build()) + .build(); + + return OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .build(); + } +} diff --git a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldRunner.java b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldRunner.java index 0c19ffad7..bbcd2a268 100644 --- a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldRunner.java +++ b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldRunner.java @@ -1,6 +1,11 @@ ///usr/bin/env jbang "$0" "$@" ; exit $? + //DEPS io.github.a2asdk:a2a-java-sdk-client:1.0.0.Alpha1-SNAPSHOT //DEPS io.github.a2asdk:a2a-java-sdk-client-transport-jsonrpc:1.0.0.Alpha1-SNAPSHOT +//DEPS io.github.a2asdk:a2a-java-sdk-opentelemetry:1.0.0.Alpha1-SNAPSHOT +//DEPS io.opentelemetry:opentelemetry-sdk:1.44.1 +//DEPS io.opentelemetry:opentelemetry-exporter-otlp:1.44.1 +//DEPS io.opentelemetry:opentelemetry-exporter-logging:1.44.1 //SOURCES HelloWorldClient.java /** diff --git a/examples/helloworld/pom.xml b/examples/helloworld/pom.xml index ddb562e60..035c7489d 100644 --- a/examples/helloworld/pom.xml +++ b/examples/helloworld/pom.xml @@ -34,25 +34,6 @@ - - - - io.quarkus - quarkus-maven-plugin - true - - - - build - generate-code - generate-code-tests - - - - - - - client server diff --git a/examples/helloworld/server/README.md b/examples/helloworld/server/README.md index 5573dce09..017db4d69 100644 --- a/examples/helloworld/server/README.md +++ b/examples/helloworld/server/README.md @@ -61,6 +61,36 @@ The Python A2A client (`test_client.py`) performs the following actions: 6. Sends the same message as a streaming request. 7. Prints each chunk of the server's streaming response as it arrives. +## Enable OpenTelemetry (Optional) + +The server includes support for distributed tracing with OpenTelemetry. To enable it: + +1. **Run with the OpenTelemetry profile**: + ```bash + mvn quarkus:dev -Popentelemetry + ``` + +2. **Access Grafana dashboard**: + - Quarkus Dev Services will automatically start a Grafana observability stack + - Open Grafana at `http://localhost:3001` (default credentials: admin/admin) + - View traces in the "Explore" section using the Tempo data source + +3. **What gets traced**: + - All A2A protocol operations (send message, get task, cancel task, etc.) + - Streaming message responses + - Task lifecycle events + - Custom operations in your `AgentExecutor` implementation (using `@Trace` annotation) + +4. **Configuration**: + - OpenTelemetry settings are in `application.properties` + - OTLP exporters run on ports 5317 (gRPC) and 5318 (HTTP) + - To use a custom OTLP endpoint, uncomment and modify: + ```properties + quarkus.otel.exporter.otlp.endpoint=http://localhost:4317 + ``` + +For more information, see the [OpenTelemetry extras module documentation](../../../extras/opentelemetry/README.md). + ## Notes - Make sure the Java server is running before starting the Python client. diff --git a/examples/helloworld/server/pom.xml b/examples/helloworld/server/pom.xml index 584690dce..66d73fd79 100644 --- a/examples/helloworld/server/pom.xml +++ b/examples/helloworld/server/pom.xml @@ -12,7 +12,7 @@ a2a-java-sdk-examples-server - Java SDK A2A Examples + Java SDK A2A Examples - HelloWorld Server Examples for the Java SDK for the Agent2Agent Protocol (A2A) @@ -25,9 +25,8 @@ a2a-java-sdk-reference-jsonrpc - io.quarkus - quarkus-resteasy-jackson - provided + io.github.a2asdk + a2a-java-sdk-transport-jsonrpc jakarta.enterprise @@ -55,7 +54,31 @@ + + --add-opens=java.base/java.lang=ALL-UNNAMED + + + + + opentelemetry + + + io.github.a2asdk + a2a-java-sdk-opentelemetry + + + io.quarkus + quarkus-opentelemetry + + + io.quarkus + quarkus-observability-devservices-lgtm + provided + + + + \ No newline at end of file diff --git a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java index 66f1c7fc4..07c4206ef 100644 --- a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java +++ b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java @@ -9,6 +9,7 @@ import jakarta.enterprise.inject.Produces; import io.a2a.server.PublicAgentCard; +import io.a2a.server.interceptors.Trace; import io.a2a.spec.AgentCapabilities; import io.a2a.spec.AgentCard; import io.a2a.spec.AgentInterface; @@ -17,6 +18,7 @@ @ApplicationScoped public class AgentCardProducer { + @Trace @Produces @PublicAgentCard public AgentCard agentCard() { @@ -26,7 +28,7 @@ public AgentCard agentCard() { .name("Hello World Agent") .description("Just a hello world agent") .supportedInterfaces(Collections.singletonList( - new AgentInterface("jsonrpc", "http://localhost:9999"))) + new AgentInterface("JSONRPC", "http://localhost:9999"))) .version("1.0.0") .documentationUrl("http://example.com/docs") .capabilities(AgentCapabilities.builder() diff --git a/examples/helloworld/server/src/main/resources/application.properties b/examples/helloworld/server/src/main/resources/application.properties index a2452b339..79aa4eb8c 100644 --- a/examples/helloworld/server/src/main/resources/application.properties +++ b/examples/helloworld/server/src/main/resources/application.properties @@ -1 +1,9 @@ -%dev.quarkus.http.port=9999 \ No newline at end of file +%dev.quarkus.http.port=9999 + +# OpenTelemetry configuration +quarkus.otel.sdk.disabled=false +quarkus.observability.lgtm.grafana-port=3001 +quarkus.observability.lgtm.otel-grpc-port=5317 +quarkus.observability.lgtm.otel-http-port=5318 +#quarkus.otel.exporter.otlp.endpoint=http://localhost:4317 +#quarkus.log.console.format=%d{HH:mm:ss} %-5p traceId=%X{traceId}, parentId=%X{parentId}, spanId=%X{spanId}, sampled=%X{sampled} [%c{2.}] (%t) %s%e%n \ No newline at end of file diff --git a/extras/opentelemetry/pom.xml b/extras/opentelemetry/pom.xml new file mode 100644 index 000000000..889401373 --- /dev/null +++ b/extras/opentelemetry/pom.xml @@ -0,0 +1,79 @@ + + + 4.0.0 + + + io.github.a2asdk + a2a-java-sdk-parent + 1.0.0.Alpha1-SNAPSHOT + ../../pom.xml + + + a2a-java-sdk-opentelemetry + + A2A Java SDK :: Extras :: Opentelemetry + Java SDK for the Agent2Agent Protocol (A2A) - Extras - Opentelemetry + + 2.0.1 + + + + + ${project.groupId} + a2a-java-sdk-server-common + + + ${project.groupId} + a2a-java-sdk-jsonrpc-common + + + ${project.groupId} + a2a-java-sdk-jsonrpc-common + + + ${project.groupId} + a2a-java-sdk-spec-grpc + + + jakarta.enterprise + jakarta.enterprise.cdi-api + + + jakarta.inject + jakarta.inject-api + + + org.eclipse.microprofile.telemetry + microprofile-telemetry-api + ${version.org.eclipse.microprofile.telemetry} + pom + provided + + + org.slf4j + slf4j-api + + + ${project.groupId} + a2a-java-sdk-client-transport-spi + + + org.junit.jupiter + junit-jupiter + test + + + org.mockito + mockito-core + test + + + org.mockito + mockito-junit-jupiter + test + + + + \ No newline at end of file diff --git a/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransport.java b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransport.java new file mode 100644 index 000000000..75ccd1faa --- /dev/null +++ b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransport.java @@ -0,0 +1,322 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.client.transport.spi.ClientTransport; +import io.a2a.client.transport.spi.interceptors.ClientCallContext; +import io.a2a.jsonrpc.common.wrappers.ListTasksResult; +import io.a2a.spec.A2AClientException; +import io.a2a.spec.A2AMethods; +import io.a2a.spec.AgentCard; +import io.a2a.spec.DeleteTaskPushNotificationConfigParams; +import io.a2a.spec.EventKind; +import io.a2a.spec.GetTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; +import io.a2a.spec.ListTasksParams; +import io.a2a.spec.MessageSendParams; +import io.a2a.spec.StreamingEventKind; +import io.a2a.spec.Task; +import io.a2a.spec.TaskIdParams; +import io.a2a.spec.TaskPushNotificationConfig; +import io.a2a.spec.TaskQueryParams; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.jspecify.annotations.Nullable; + +public class OpenTelemetryClientTransport implements ClientTransport { + + private final Tracer tracer; + private final ClientTransport delegate; + private static final String REQUEST_TRACE_ATTRIBUTE = "request"; + private static final String RESPONSE_TRACE_ATTRIBUTE = "response"; + + public OpenTelemetryClientTransport(ClientTransport delegate, Tracer tracer) { + this.delegate = delegate; + this.tracer = tracer; + } + + /** + * Traces an operation that returns a value with a request parameter. + * + * @param operationName the name of the operation for the span + * @param request the request object + * @param operation the operation to execute + * @param responseFormatter optional function to format the response for span attribute (defaults to toString) + * @param the request type + * @param the return type + * @return the result of the operation + * @throws A2AClientException if the operation fails + */ + private T traceOperation(String operationName, R request, + ThrowingSupplier operation, + @Nullable Function responseFormatter) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder(operationName).setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute(REQUEST_TRACE_ATTRIBUTE, request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + T result = operation.get(); + if (result != null) { + String responseValue = responseFormatter != null ? responseFormatter.apply(result) : result.toString(); + span.setAttribute(RESPONSE_TRACE_ATTRIBUTE, responseValue); + span.setStatus(StatusCode.OK); + } + return result; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + /** + * Traces an operation that returns a value without a request parameter. + * + * @param operationName the name of the operation for the span + * @param operation the operation to execute + * @param the return type + * @return the result of the operation + * @throws A2AClientException if the operation fails + */ + private T traceOperationNoRequest(String operationName, ThrowingSupplier operation) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder(operationName).setSpanKind(SpanKind.CLIENT); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + T result = operation.get(); + if (result != null) { + span.setAttribute(RESPONSE_TRACE_ATTRIBUTE, result.toString()); + span.setStatus(StatusCode.OK); + } + return result; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + /** + * Traces a void operation with a request parameter. + * + * @param operationName the name of the operation for the span + * @param request the request object + * @param operation the operation to execute + * @param the request type + * @throws A2AClientException if the operation fails + */ + private void traceVoidOperation(String operationName, R request, ThrowingRunnable operation) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder(operationName).setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute(REQUEST_TRACE_ATTRIBUTE, request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + operation.run(); + span.setStatus(StatusCode.OK); + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + /** + * Traces a streaming operation with wrapped consumers. + * + * @param operationName the name of the operation for the span + * @param request the request object + * @param eventConsumer the event consumer + * @param errorConsumer the error consumer + * @param operation the operation to execute with wrapped consumers + * @param the request type + * @throws A2AClientException if the operation fails + */ + private void traceStreamingOperation(String operationName, R request, + Consumer eventConsumer, + Consumer errorConsumer, + StreamingOperation operation) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder(operationName).setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute(REQUEST_TRACE_ATTRIBUTE, request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + operation.execute( + new OpenTelemetryEventConsumer(operationName + "-event", eventConsumer, tracer, span.getSpanContext()), + new OpenTelemetryErrorConsumer(operationName + "-error", errorConsumer, tracer, span.getSpanContext()) + ); + span.setStatus(StatusCode.OK); + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public EventKind sendMessage(MessageSendParams request, @Nullable ClientCallContext context) throws A2AClientException { + return traceOperation(A2AMethods.SEND_MESSAGE_METHOD, request, () -> delegate.sendMessage(request, context), null); + } + + @Override + public void sendMessageStreaming(MessageSendParams request, Consumer eventConsumer, + Consumer errorConsumer, @Nullable ClientCallContext context) throws A2AClientException { + traceStreamingOperation(A2AMethods.SEND_STREAMING_MESSAGE_METHOD, request, eventConsumer, errorConsumer, + (wrappedEvent, wrappedError) -> delegate.sendMessageStreaming(request, wrappedEvent, wrappedError, context)); + } + + @Override + public Task getTask(TaskQueryParams request, @Nullable ClientCallContext context) throws A2AClientException { + return traceOperation(A2AMethods.GET_TASK_METHOD, request, () -> delegate.getTask(request, context), null); + } + + @Override + public Task cancelTask(TaskIdParams request, @Nullable ClientCallContext context) throws A2AClientException { + return traceOperation(A2AMethods.CANCEL_TASK_METHOD, request, () -> delegate.cancelTask(request, context), null); + } + + @Override + public ListTasksResult listTasks(ListTasksParams request, @Nullable ClientCallContext context) throws A2AClientException { + return traceOperation(A2AMethods.LIST_TASK_METHOD, request, () -> delegate.listTasks(request, context), null); + } + + @Override + public TaskPushNotificationConfig setTaskPushNotificationConfiguration(TaskPushNotificationConfig request, + @Nullable ClientCallContext context) throws A2AClientException { + return traceOperation(A2AMethods.SET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, request, + () -> delegate.setTaskPushNotificationConfiguration(request, context), null); + } + + @Override + public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPushNotificationConfigParams request, + @Nullable ClientCallContext context) throws A2AClientException { + return traceOperation(A2AMethods.GET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, request, + () -> delegate.getTaskPushNotificationConfiguration(request, context), null); + } + + @Override + public ListTaskPushNotificationConfigResult listTaskPushNotificationConfigurations(ListTaskPushNotificationConfigParams request, + @Nullable ClientCallContext context) throws A2AClientException { + return traceOperation(A2AMethods.LIST_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, request, + () -> delegate.listTaskPushNotificationConfigurations(request, context), + result -> result.configs().stream().map(TaskPushNotificationConfig::toString).collect(Collectors.joining(","))); + } + + @Override + public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationConfigParams request, + @Nullable ClientCallContext context) throws A2AClientException { + traceVoidOperation(A2AMethods.DELETE_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, request, + () -> delegate.deleteTaskPushNotificationConfigurations(request, context)); + } + + @Override + public void resubscribe(TaskIdParams request, Consumer eventConsumer, + Consumer errorConsumer, @Nullable ClientCallContext context) throws A2AClientException { + traceStreamingOperation(A2AMethods.SUBSCRIBE_TO_TASK_METHOD, request, eventConsumer, errorConsumer, + (wrappedEvent, wrappedError) -> delegate.resubscribe(request, wrappedEvent, wrappedError, context)); + } + + @Override + public AgentCard getAgentCard(@Nullable ClientCallContext context) throws A2AClientException { + return traceOperationNoRequest(A2AMethods.GET_EXTENDED_AGENT_CARD_METHOD, () -> delegate.getAgentCard(context)); + } + + @Override + public void close() { + delegate.close(); + } + + /** + * Functional interface for operations that may throw A2AClientException. + * + * @param the return type + */ + @FunctionalInterface + private interface ThrowingSupplier { + T get() throws A2AClientException; + } + + /** + * Functional interface for void operations that may throw A2AClientException. + */ + @FunctionalInterface + private interface ThrowingRunnable { + void run() throws A2AClientException; + } + + /** + * Functional interface for streaming operations with wrapped consumers. + */ + @FunctionalInterface + private interface StreamingOperation { + void execute(Consumer eventConsumer, Consumer errorConsumer) throws A2AClientException; + } + + private static class OpenTelemetryEventConsumer implements Consumer { + + private final Consumer delegate; + private final Tracer tracer; + private final SpanContext context; + private final String name; + + public OpenTelemetryEventConsumer(String name, Consumer delegate, Tracer tracer, SpanContext context) { + this.delegate = delegate; + this.tracer = tracer; + this.context = context; + this.name = name; + } + + @Override + public void accept(StreamingEventKind t) { + SpanBuilder spanBuilder = tracer.spanBuilder(name) + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("gen_ai.agent.a2a.streaming-event", t.toString()); + spanBuilder.addLink(context); + Span span = spanBuilder.startSpan(); + try { + delegate.accept(t); + span.setStatus(StatusCode.OK); + } finally { + span.end(); + } + } + } + + private static class OpenTelemetryErrorConsumer implements Consumer { + + private final Consumer delegate; + private final Tracer tracer; + private final SpanContext context; + private final String name; + + public OpenTelemetryErrorConsumer(String name, Consumer delegate, Tracer tracer, SpanContext context) { + this.delegate = delegate; + this.tracer = tracer; + this.context = context; + this.name = name; + } + + @Override + public void accept(Throwable t) { + if (t == null) { + return; + } + SpanBuilder spanBuilder = tracer.spanBuilder(name) + .setSpanKind(SpanKind.CLIENT); + spanBuilder.addLink(context); + Span span = spanBuilder.startSpan(); + try { + span.setStatus(StatusCode.ERROR, t.getMessage()); + delegate.accept(t); + } finally { + span.end(); + } + } + } +} diff --git a/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportWrapper.java b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportWrapper.java new file mode 100644 index 000000000..be5294299 --- /dev/null +++ b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportWrapper.java @@ -0,0 +1,45 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.client.transport.spi.ClientTransport; +import io.a2a.client.transport.spi.ClientTransportConfig; +import io.a2a.client.transport.spi.ClientTransportWrapper; +import io.opentelemetry.api.trace.Tracer; + +/** + * OpenTelemetry client transport wrapper that adds distributed tracing to A2A client calls. + * + *

This wrapper is automatically discovered via Java's ServiceLoader mechanism. + * To enable tracing, add a {@link Tracer} instance to the transport configuration: + *

{@code
+ * ClientTransportConfig config = new JSONRPCTransportConfig();
+ * config.setParameters(Map.of(
+ *     OpenTelemetryClientTransportFactory.OTEL_TRACER_KEY,
+ *     openTelemetry.getTracer("my-service")
+ * ));
+ * }
+ */ +public class OpenTelemetryClientTransportWrapper implements ClientTransportWrapper { + + /** + * Configuration key for the OpenTelemetry Tracer instance. + * Value must be of type {@link Tracer}. + */ + public static final String OTEL_TRACER_KEY = "io.a2a.extras.opentelemetry.Tracer"; + + @Override + public ClientTransport wrap(ClientTransport transport, ClientTransportConfig config) { + Object tracerObj = config.getParameters().get(OTEL_TRACER_KEY); + if (tracerObj instanceof Tracer tracer) { + return new OpenTelemetryClientTransport(transport, tracer); + } + // No tracer configured, return unwrapped transport + return transport; + } + + @Override + public int priority() { + // Observability/tracing should be in the middle priority range + // so it can observe other wrappers but doesn't interfere with security + return 500; + } +} diff --git a/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/SpanInterceptor.java b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/SpanInterceptor.java new file mode 100644 index 000000000..6e3bbef57 --- /dev/null +++ b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/SpanInterceptor.java @@ -0,0 +1,144 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.server.interceptors.Kind; +import io.a2a.server.interceptors.NoAttributeExtractor; +import io.a2a.server.interceptors.Trace; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import jakarta.annotation.Priority; +import jakarta.inject.Inject; +import jakarta.interceptor.AroundInvoke; +import jakarta.interceptor.Interceptor; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Jakarta EE CDI interceptor for @Trace annotation. + * Integrates with OpenTelemetry to create spans for traced methods. + */ +@Trace() +@Interceptor +@Priority(Interceptor.Priority.APPLICATION) +public class SpanInterceptor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SpanInterceptor.class); + + /** + * Common CDI proxy suffixes used by various CDI implementations. + * These are appended to class names when CDI creates proxies for intercepted beans. + */ + private static final String[] CDI_PROXY_SUFFIXES = { + "_Subclass", // Quarkus/Weld subclass proxies + "_ClientProxy", // Weld client proxies + "$$_WeldSubclass", // Weld subclass alternative + "_$$_javassist_", // Javassist-based proxies + "$Proxy$_$$_" // Other CDI proxy patterns + }; + + @Inject + private Tracer tracer; + + @AroundInvoke + public Object trace(jakarta.interceptor.InvocationContext jakartaContext) throws Exception { + // Convert Jakarta InvocationContext to our custom InvocationContext + io.a2a.server.interceptors.InvocationContext customContext + = new io.a2a.server.interceptors.InvocationContext( + jakartaContext.getTarget(), + jakartaContext.getMethod(), + jakartaContext.getParameters() + ); + + Kind kind = jakartaContext + .getMethod() + .getAnnotation(Trace.class) + .kind(); + Class>>> extractorClass + = jakartaContext.getMethod() + .getAnnotation(Trace.class) + .extractor(); + + // Get the actual class name, stripping CDI proxy suffixes for cleaner span names + // CDI implementations like Weld/Quarkus add suffixes to proxied classes + String rawClassName = jakartaContext.getTarget().getClass().getName(); + String className = stripCdiProxySuffix(rawClassName); + // Use raw class name as fallback if stripping returns null (shouldn't happen in practice) + String name = (className != null ? className : rawClassName) + '#' + jakartaContext.getMethod().getName(); + SpanBuilder spanBuilder = tracer.spanBuilder(name) + .setSpanKind(SpanKind.valueOf(kind.toString())); + + if (extractorClass != null && !extractorClass.equals(NoAttributeExtractor.class)) { + try { + Supplier>> supplier + = extractorClass.getDeclaredConstructor().newInstance(); + Map attributes = supplier.get().apply(customContext); + for (Map.Entry attribute : attributes.entrySet()) { + spanBuilder.setAttribute(attribute.getKey(), attribute.getValue()); + } + } catch (Exception e) { + LOGGER.warn("Failed to instantiate attribute extractor {}: {}", + extractorClass.getName(), e.getMessage(), e); + } + } + + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + Object ret = jakartaContext.proceed(); + span.setStatus(StatusCode.OK); + if (ret != null) { + span.setAttribute("gen_ai.agent.a2a.response", ret.toString()); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + /** + * Strips known CDI proxy suffixes from class names to get the original class name. + *

+ * CDI implementations (Weld, Quarkus, etc.) create proxy classes for beans with interceptors, + * appending various suffixes to the original class name. This method removes these suffixes + * to provide cleaner, more readable span names in OpenTelemetry traces. + *

+ * For example: + *

    + *
  • {@code com.example.MyService_Subclass} → {@code com.example.MyService}
  • + *
  • {@code com.example.MyService_ClientProxy} → {@code com.example.MyService}
  • + *
  • {@code com.example.MyService$$_WeldSubclass} → {@code com.example.MyService}
  • + *
+ * + * @param className the potentially proxied class name + * @return the class name with CDI proxy suffixes removed, or the original name if no suffix found, or null if input is null + */ + private @Nullable String stripCdiProxySuffix(@Nullable String className) { + if (className == null) { + return null; + } + + // Check each known CDI proxy suffix and remove it if found + for (String suffix : CDI_PROXY_SUFFIXES) { + if (className.endsWith(suffix)) { + return className.substring(0, className.length() - suffix.length()); + } + // Also handle cases where the suffix appears in the middle (e.g., _$$_javassist_N) + int suffixIndex = className.indexOf(suffix); + if (suffixIndex > 0) { + return className.substring(0, suffixIndex); + } + } + + return className; + } +} diff --git a/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/package-info.java b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/package-info.java new file mode 100644 index 000000000..fdd7057d5 --- /dev/null +++ b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/package-info.java @@ -0,0 +1,5 @@ +@NullMarked +package io.a2a.extras.opentelemetry; + +import org.jspecify.annotations.NullMarked; + diff --git a/extras/opentelemetry/src/main/resources/META-INF/beans.xml b/extras/opentelemetry/src/main/resources/META-INF/beans.xml new file mode 100644 index 000000000..9b2940fc2 --- /dev/null +++ b/extras/opentelemetry/src/main/resources/META-INF/beans.xml @@ -0,0 +1,6 @@ + + + \ No newline at end of file diff --git a/extras/opentelemetry/src/main/resources/META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper b/extras/opentelemetry/src/main/resources/META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper new file mode 100644 index 000000000..f5312bf6b --- /dev/null +++ b/extras/opentelemetry/src/main/resources/META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper @@ -0,0 +1 @@ +io.a2a.extras.opentelemetry.OpenTelemetryClientTransportWrapper diff --git a/extras/opentelemetry/src/test/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportTest.java b/extras/opentelemetry/src/test/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportTest.java new file mode 100644 index 000000000..29818e659 --- /dev/null +++ b/extras/opentelemetry/src/test/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportTest.java @@ -0,0 +1,480 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.client.transport.spi.ClientTransport; +import io.a2a.client.transport.spi.interceptors.ClientCallContext; +import io.a2a.jsonrpc.common.wrappers.ListTasksResult; +import io.a2a.spec.A2AClientException; +import io.a2a.spec.AgentCard; +import io.a2a.spec.DeleteTaskPushNotificationConfigParams; +import io.a2a.spec.EventKind; +import io.a2a.spec.GetTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigResult; +import io.a2a.spec.ListTasksParams; +import io.a2a.spec.Message; +import io.a2a.spec.MessageSendParams; +import io.a2a.spec.StreamingEventKind; +import io.a2a.spec.TextPart; +import io.a2a.spec.Task; +import io.a2a.spec.TaskIdParams; +import io.a2a.spec.TaskPushNotificationConfig; +import io.a2a.spec.TaskQueryParams; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.List; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import io.a2a.spec.A2AMethods; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class OpenTelemetryClientTransportTest { + + @Mock + private ClientTransport delegate; + + @Mock + private Tracer tracer; + + @Mock + private SpanBuilder spanBuilder; + + @Mock + private Span span; + + @Mock + private Scope scope; + + @Mock + private SpanContext spanContext; + + @Mock + private ClientCallContext context; + + private OpenTelemetryClientTransport transport; + + @BeforeEach + void setUp() { + when(tracer.spanBuilder(anyString())).thenReturn(spanBuilder); + when(spanBuilder.setSpanKind(any(SpanKind.class))).thenReturn(spanBuilder); + when(spanBuilder.setAttribute(anyString(), anyString())).thenReturn(spanBuilder); + when(spanBuilder.addLink(any(SpanContext.class))).thenReturn(spanBuilder); + when(spanBuilder.startSpan()).thenReturn(span); + when(span.makeCurrent()).thenReturn(scope); + when(span.getSpanContext()).thenReturn(spanContext); + + transport = new OpenTelemetryClientTransport(delegate, tracer); + } + + @Test + void testSendMessage_Success() throws A2AClientException { + MessageSendParams request = mock(MessageSendParams.class); + EventKind expectedResult = mock(EventKind.class); + when(request.toString()).thenReturn("request-string"); + when(expectedResult.toString()).thenReturn("response-string"); + when(delegate.sendMessage(request, context)).thenReturn(expectedResult); + + EventKind result = transport.sendMessage(request, context); + + assertEquals(expectedResult, result); + verify(tracer).spanBuilder(A2AMethods.SEND_MESSAGE_METHOD); + verify(spanBuilder).setSpanKind(SpanKind.CLIENT); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setAttribute("response", "response-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + verify(scope).close(); + } + + @Test + void testSendMessage_NullResponse() throws A2AClientException { + MessageSendParams request = mock(MessageSendParams.class); + when(request.toString()).thenReturn("request-string"); + when(delegate.sendMessage(request, context)).thenReturn(null); + + EventKind result = transport.sendMessage(request, context); + + assertNull(result); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(spanBuilder, never()).setAttribute(eq("response"), anyString()); + verify(span, never()).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testSendMessage_ThrowsException() throws A2AClientException { + MessageSendParams request = mock(MessageSendParams.class); + when(request.toString()).thenReturn("request-string"); + A2AClientException expectedException = new A2AClientException("Test error"); + when(delegate.sendMessage(request, context)).thenThrow(expectedException); + + A2AClientException exception = assertThrows(A2AClientException.class, + () -> transport.sendMessage(request, context)); + + assertEquals(expectedException, exception); + verify(span).setStatus(StatusCode.ERROR, "Test error"); + verify(span).end(); + verify(scope).close(); + } + + @Test + void testSendMessageStreaming() throws A2AClientException { + MessageSendParams request = mock(MessageSendParams.class); + when(request.toString()).thenReturn("request-string"); + Consumer eventConsumer = mock(Consumer.class); + Consumer errorConsumer = mock(Consumer.class); + + transport.sendMessageStreaming(request, eventConsumer, errorConsumer, context); + + verify(tracer).spanBuilder(A2AMethods.SEND_STREAMING_MESSAGE_METHOD); + verify(spanBuilder).setSpanKind(SpanKind.CLIENT); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + + ArgumentCaptor> eventConsumerCaptor = ArgumentCaptor.forClass(Consumer.class); + ArgumentCaptor> errorConsumerCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(delegate).sendMessageStreaming(eq(request), eventConsumerCaptor.capture(), + errorConsumerCaptor.capture(), eq(context)); + + assertNotNull(eventConsumerCaptor.getValue()); + assertNotNull(errorConsumerCaptor.getValue()); + } + + @Test + void testGetTask_Success() throws A2AClientException { + TaskQueryParams request = mock(TaskQueryParams.class); + Task expectedResult = mock(Task.class); + when(request.toString()).thenReturn("request-string"); + when(expectedResult.toString()).thenReturn("response-string"); + when(delegate.getTask(request, context)).thenReturn(expectedResult); + + Task result = transport.getTask(request, context); + + assertEquals(expectedResult, result); + verify(tracer).spanBuilder(A2AMethods.GET_TASK_METHOD); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setAttribute("response", "response-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testCancelTask_Success() throws A2AClientException { + TaskIdParams request = mock(TaskIdParams.class); + Task expectedResult = mock(Task.class); + when(request.toString()).thenReturn("request-string"); + when(expectedResult.toString()).thenReturn("response-string"); + when(delegate.cancelTask(request, context)).thenReturn(expectedResult); + + Task result = transport.cancelTask(request, context); + + assertEquals(expectedResult, result); + verify(tracer).spanBuilder(A2AMethods.CANCEL_TASK_METHOD); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setAttribute("response", "response-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testListTasks_Success() throws A2AClientException { + ListTasksParams request = mock(ListTasksParams.class); + ListTasksResult expectedResult = mock(ListTasksResult.class); + when(request.toString()).thenReturn("request-string"); + when(expectedResult.toString()).thenReturn("response-string"); + when(delegate.listTasks(request, context)).thenReturn(expectedResult); + + ListTasksResult result = transport.listTasks(request, context); + + assertEquals(expectedResult, result); + verify(tracer).spanBuilder(A2AMethods.LIST_TASK_METHOD); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setAttribute("response", "response-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testSetTaskPushNotificationConfiguration_Success() throws A2AClientException { + TaskPushNotificationConfig request = mock(TaskPushNotificationConfig.class); + TaskPushNotificationConfig expectedResult = mock(TaskPushNotificationConfig.class); + when(request.toString()).thenReturn("request-string"); + when(expectedResult.toString()).thenReturn("response-string"); + when(delegate.setTaskPushNotificationConfiguration(request, context)).thenReturn(expectedResult); + + TaskPushNotificationConfig result = transport.setTaskPushNotificationConfiguration(request, context); + + assertEquals(expectedResult, result); + verify(tracer).spanBuilder(A2AMethods.SET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setAttribute("response", "response-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testGetTaskPushNotificationConfiguration_Success() throws A2AClientException { + GetTaskPushNotificationConfigParams request = mock(GetTaskPushNotificationConfigParams.class); + TaskPushNotificationConfig expectedResult = mock(TaskPushNotificationConfig.class); + when(request.toString()).thenReturn("request-string"); + when(expectedResult.toString()).thenReturn("response-string"); + when(delegate.getTaskPushNotificationConfiguration(request, context)).thenReturn(expectedResult); + + TaskPushNotificationConfig result = transport.getTaskPushNotificationConfiguration(request, context); + + assertEquals(expectedResult, result); + verify(tracer).spanBuilder(A2AMethods.GET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setAttribute("response", "response-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testListTaskPushNotificationConfigurations_Success() throws A2AClientException { + ListTaskPushNotificationConfigParams request = mock(ListTaskPushNotificationConfigParams.class); + TaskPushNotificationConfig config1 = mock(TaskPushNotificationConfig.class); + TaskPushNotificationConfig config2 = mock(TaskPushNotificationConfig.class); + when(config1.toString()).thenReturn("config1"); + when(config2.toString()).thenReturn("config2"); + ListTaskPushNotificationConfigResult expectedResult = new ListTaskPushNotificationConfigResult(List.of(config1, config2)); + when(request.toString()).thenReturn("request-string"); + when(delegate.listTaskPushNotificationConfigurations(request, context)).thenReturn(expectedResult); + + ListTaskPushNotificationConfigResult result = transport.listTaskPushNotificationConfigurations(request, context); + + assertEquals(expectedResult, result); + verify(tracer).spanBuilder(A2AMethods.LIST_TASK_PUSH_NOTIFICATION_CONFIG_METHOD); + verify(spanBuilder).setSpanKind(SpanKind.CLIENT); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setAttribute("response", "config1,config2"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testDeleteTaskPushNotificationConfigurations_Success() throws A2AClientException { + DeleteTaskPushNotificationConfigParams request = mock(DeleteTaskPushNotificationConfigParams.class); + when(request.toString()).thenReturn("request-string"); + doNothing().when(delegate).deleteTaskPushNotificationConfigurations(request, context); + + transport.deleteTaskPushNotificationConfigurations(request, context); + + verify(tracer).spanBuilder(A2AMethods.DELETE_TASK_PUSH_NOTIFICATION_CONFIG_METHOD); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + verify(delegate).deleteTaskPushNotificationConfigurations(request, context); + } + + @Test + void testResubscribe() throws A2AClientException { + TaskIdParams request = mock(TaskIdParams.class); + when(request.toString()).thenReturn("request-string"); + Consumer eventConsumer = mock(Consumer.class); + Consumer errorConsumer = mock(Consumer.class); + + transport.resubscribe(request, eventConsumer, errorConsumer, context); + + verify(tracer).spanBuilder(A2AMethods.SUBSCRIBE_TO_TASK_METHOD); + verify(spanBuilder).setSpanKind(SpanKind.CLIENT); + verify(spanBuilder).setAttribute("request", "request-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + + ArgumentCaptor> eventConsumerCaptor = ArgumentCaptor.forClass(Consumer.class); + ArgumentCaptor> errorConsumerCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(delegate).resubscribe(eq(request), eventConsumerCaptor.capture(), + errorConsumerCaptor.capture(), eq(context)); + + assertNotNull(eventConsumerCaptor.getValue()); + assertNotNull(errorConsumerCaptor.getValue()); + } + + @Test + void testGetAgentCard_Success() throws A2AClientException { + AgentCard expectedResult = mock(AgentCard.class); + when(expectedResult.toString()).thenReturn("response-string"); + when(delegate.getAgentCard(context)).thenReturn(expectedResult); + + AgentCard result = transport.getAgentCard(context); + + assertEquals(expectedResult, result); + verify(tracer).spanBuilder(A2AMethods.GET_EXTENDED_AGENT_CARD_METHOD); + verify(spanBuilder).setSpanKind(SpanKind.CLIENT); + verify(span).setAttribute("response", "response-string"); + verify(span).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testGetAgentCard_NullResponse() throws A2AClientException { + when(delegate.getAgentCard(context)).thenReturn(null); + + AgentCard result = transport.getAgentCard(context); + + assertNull(result); + verify(tracer).spanBuilder(A2AMethods.GET_EXTENDED_AGENT_CARD_METHOD); + verify(spanBuilder).setSpanKind(SpanKind.CLIENT); + verify(span, never()).setAttribute(eq("response"), anyString()); + verify(span, never()).setStatus(StatusCode.OK); + verify(span).end(); + } + + @Test + void testClose() { + transport.close(); + verify(delegate).close(); + } + + @Test + void testEventConsumer_ThroughSendMessageStreaming() throws A2AClientException { + MessageSendParams request = mock(MessageSendParams.class); + when(request.toString()).thenReturn("request-string"); + + SpanBuilder eventSpanBuilder = mock(SpanBuilder.class); + Span eventSpan = mock(Span.class); + when(tracer.spanBuilder(A2AMethods.SEND_STREAMING_MESSAGE_METHOD + "-event")).thenReturn(eventSpanBuilder); + when(eventSpanBuilder.setSpanKind(any(SpanKind.class))).thenReturn(eventSpanBuilder); + when(eventSpanBuilder.setAttribute(anyString(), anyString())).thenReturn(eventSpanBuilder); + when(eventSpanBuilder.addLink(any(SpanContext.class))).thenReturn(eventSpanBuilder); + when(eventSpanBuilder.startSpan()).thenReturn(eventSpan); + + ArgumentCaptor> eventConsumerCaptor = ArgumentCaptor.forClass(Consumer.class); + Consumer originalConsumer = mock(Consumer.class); + + transport.sendMessageStreaming(request, originalConsumer, mock(Consumer.class), context); + + verify(delegate).sendMessageStreaming(eq(request), eventConsumerCaptor.capture(), any(Consumer.class), eq(context)); + + Message event = Message.builder() + .messageId("test-id") + .taskId("task-id") + .role(Message.Role.USER) + .parts(List.of(new TextPart("test content"))) + .build(); + + eventConsumerCaptor.getValue().accept(event); + + verify(tracer).spanBuilder(A2AMethods.SEND_STREAMING_MESSAGE_METHOD + "-event"); + verify(eventSpan).setStatus(StatusCode.OK); + verify(eventSpan).end(); + verify(originalConsumer).accept(event); + } + + @Test + void testErrorConsumer_ThroughSendMessageStreaming() throws A2AClientException { + MessageSendParams request = mock(MessageSendParams.class); + when(request.toString()).thenReturn("request-string"); + + SpanBuilder errorSpanBuilder = mock(SpanBuilder.class); + Span errorSpan = mock(Span.class); + when(tracer.spanBuilder(A2AMethods.SEND_STREAMING_MESSAGE_METHOD + "-error")).thenReturn(errorSpanBuilder); + when(errorSpanBuilder.setSpanKind(any(SpanKind.class))).thenReturn(errorSpanBuilder); + when(errorSpanBuilder.addLink(any(SpanContext.class))).thenReturn(errorSpanBuilder); + when(errorSpanBuilder.startSpan()).thenReturn(errorSpan); + + ArgumentCaptor> errorConsumerCaptor = ArgumentCaptor.forClass(Consumer.class); + Consumer originalConsumer = mock(Consumer.class); + + transport.sendMessageStreaming(request, mock(Consumer.class), originalConsumer, context); + + verify(delegate).sendMessageStreaming(eq(request), any(Consumer.class), errorConsumerCaptor.capture(), eq(context)); + + Throwable error = new RuntimeException("Test error"); + + errorConsumerCaptor.getValue().accept(error); + + verify(tracer).spanBuilder(A2AMethods.SEND_STREAMING_MESSAGE_METHOD + "-error"); + verify(errorSpan).setStatus(StatusCode.ERROR, "Test error"); + verify(errorSpan).end(); + verify(originalConsumer).accept(error); + } + + @Test + void testErrorConsumer_WithNullThrowable() throws A2AClientException { + MessageSendParams request = mock(MessageSendParams.class); + when(request.toString()).thenReturn("request-string"); + + ArgumentCaptor> errorConsumerCaptor = ArgumentCaptor.forClass(Consumer.class); + Consumer originalConsumer = mock(Consumer.class); + + transport.sendMessageStreaming(request, mock(Consumer.class), originalConsumer, context); + + verify(delegate).sendMessageStreaming(eq(request), any(Consumer.class), errorConsumerCaptor.capture(), eq(context)); + + errorConsumerCaptor.getValue().accept(null); + + verify(originalConsumer, never()).accept(any()); + } + + @Test + void testDeleteTaskPushNotificationConfigurations_ThrowsException() throws A2AClientException { + DeleteTaskPushNotificationConfigParams request = mock(DeleteTaskPushNotificationConfigParams.class); + when(request.toString()).thenReturn("request-string"); + A2AClientException expectedException = new A2AClientException("Delete failed"); + doThrow(expectedException).when(delegate).deleteTaskPushNotificationConfigurations(request, context); + + A2AClientException exception = assertThrows(A2AClientException.class, + () -> transport.deleteTaskPushNotificationConfigurations(request, context)); + + assertEquals(expectedException, exception); + verify(span).setStatus(StatusCode.ERROR, "Delete failed"); + verify(span).end(); + } + + @Test + void testResubscribe_ThrowsException() throws A2AClientException { + TaskIdParams request = mock(TaskIdParams.class); + when(request.toString()).thenReturn("request-string"); + Consumer eventConsumer = mock(Consumer.class); + Consumer errorConsumer = mock(Consumer.class); + A2AClientException expectedException = new A2AClientException("Resubscribe failed"); + doThrow(expectedException).when(delegate).resubscribe(any(TaskIdParams.class), any(Consumer.class), + any(Consumer.class), any(ClientCallContext.class)); + + A2AClientException exception = assertThrows(A2AClientException.class, + () -> transport.resubscribe(request, eventConsumer, errorConsumer, context)); + + assertEquals(expectedException, exception); + verify(span).setStatus(StatusCode.ERROR, "Resubscribe failed"); + verify(span).end(); + } + + @Test + void testSendMessageStreaming_ThrowsException() throws A2AClientException { + MessageSendParams request = mock(MessageSendParams.class); + when(request.toString()).thenReturn("request-string"); + Consumer eventConsumer = mock(Consumer.class); + Consumer errorConsumer = mock(Consumer.class); + A2AClientException expectedException = new A2AClientException("Streaming failed"); + doThrow(expectedException).when(delegate).sendMessageStreaming(any(MessageSendParams.class), any(Consumer.class), + any(Consumer.class), any(ClientCallContext.class)); + + A2AClientException exception = assertThrows(A2AClientException.class, + () -> transport.sendMessageStreaming(request, eventConsumer, errorConsumer, context)); + + assertEquals(expectedException, exception); + verify(span).setStatus(StatusCode.ERROR, "Streaming failed"); + verify(span).end(); + } +} diff --git a/extras/opentelemetry/src/test/java/io/a2a/extras/opentelemetry/SpanInterceptorTest.java b/extras/opentelemetry/src/test/java/io/a2a/extras/opentelemetry/SpanInterceptorTest.java new file mode 100644 index 000000000..7bf6ba1f1 --- /dev/null +++ b/extras/opentelemetry/src/test/java/io/a2a/extras/opentelemetry/SpanInterceptorTest.java @@ -0,0 +1,100 @@ +package io.a2a.extras.opentelemetry; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import static org.junit.jupiter.api.Assertions.*; + +class SpanInterceptorTest { + + private SpanInterceptor interceptor = new SpanInterceptor(); + + /** + * Test the stripCdiProxySuffix method using reflection since it's private. + */ + private String invokeStripCdiProxySuffix(String className) throws Exception { + Method method = SpanInterceptor.class.getDeclaredMethod("stripCdiProxySuffix", String.class); + method.setAccessible(true); + return (String) method.invoke(interceptor, className); + } + + @Test + void testStripCdiProxySuffix_Subclass() throws Exception { + assertEquals("com.example.MyService", + invokeStripCdiProxySuffix("com.example.MyService_Subclass")); + } + + @Test + void testStripCdiProxySuffix_ClientProxy() throws Exception { + assertEquals("com.example.MyService", + invokeStripCdiProxySuffix("com.example.MyService_ClientProxy")); + } + + @Test + void testStripCdiProxySuffix_WeldSubclass() throws Exception { + assertEquals("com.example.MyService", + invokeStripCdiProxySuffix("com.example.MyService$$_WeldSubclass")); + } + + @Test + void testStripCdiProxySuffix_Javassist() throws Exception { + assertEquals("com.example.MyService", + invokeStripCdiProxySuffix("com.example.MyService_$$_javassist_1")); + } + + @Test + void testStripCdiProxySuffix_ProxyPattern() throws Exception { + assertEquals("com.example.MyService", + invokeStripCdiProxySuffix("com.example.MyService$Proxy$_$$_123")); + } + + @Test + void testStripCdiProxySuffix_NoSuffix() throws Exception { + // Class without CDI proxy suffix should remain unchanged + assertEquals("com.example.MyService", + invokeStripCdiProxySuffix("com.example.MyService")); + } + + @Test + void testStripCdiProxySuffix_Null() throws Exception { + // Null input should return null + assertNull(invokeStripCdiProxySuffix(null)); + } + + @Test + void testStripCdiProxySuffix_EmptyString() throws Exception { + // Empty string should remain unchanged + assertEquals("", invokeStripCdiProxySuffix("")); + } + + @Test + void testStripCdiProxySuffix_MultipleSuffixes() throws Exception { + // Only the first matching suffix should be removed + // This tests that we stop at the first match + assertEquals("com.example.MyService", + invokeStripCdiProxySuffix("com.example.MyService_Subclass_ClientProxy")); + } + + @Test + void testStripCdiProxySuffix_InnerClass() throws Exception { + // Inner class with proxy suffix + assertEquals("com.example.OuterClass$InnerService", + invokeStripCdiProxySuffix("com.example.OuterClass$InnerService_Subclass")); + } + + @Test + void testStripCdiProxySuffix_PackageWithSimilarNames() throws Exception { + // Ensure we don't accidentally strip part of the package name + assertEquals("com.example.subclass.MyService", + invokeStripCdiProxySuffix("com.example.subclass.MyService_Subclass")); + } + + @Test + void testStripCdiProxySuffix_ShortClassName() throws Exception { + // Very short class name + assertEquals("A", + invokeStripCdiProxySuffix("A_Subclass")); + } +} diff --git a/pom.xml b/pom.xml index 643b56e53..26b2fa35c 100644 --- a/pom.xml +++ b/pom.xml @@ -184,6 +184,16 @@ a2a-java-sdk-reference-rest ${project.version}
+ + io.github.a2asdk + a2a-java-sdk-opentelemetry + ${project.version} + + + io.github.a2asdk + a2a-java-sdk-opentelemetry-spring + ${project.version} + io.grpc grpc-bom @@ -507,6 +517,7 @@ examples/helloworld examples/cloud-deployment/server extras/common + extras/opentelemetry extras/task-store-database-jpa extras/push-notification-config-store-database-jpa extras/queue-manager-replicated diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java index 98e40585b..3b0421b44 100644 --- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java @@ -14,7 +14,7 @@ /** * gRPC server interceptor that captures request metadata and context information, * providing equivalent functionality to Python's grpc.aio.ServicerContext. - * + * * This interceptor: * - Extracts A2A extension headers from incoming requests * - Captures ServerCall and Metadata for rich context access @@ -24,7 +24,6 @@ @ApplicationScoped public class A2AExtensionsInterceptor implements ServerInterceptor { - @Override public ServerCall.Listener interceptCall( ServerCall serverCall, @@ -43,12 +42,14 @@ public ServerCall.Listener interceptCall( // Create enhanced context with rich information (equivalent to Python's ServicerContext) Context context = Context.current() - // Store complete metadata for full header access - .withValue(GrpcContextKeys.METADATA_KEY, metadata) - // Store method name (equivalent to Python's context.method()) - .withValue(GrpcContextKeys.METHOD_NAME_KEY, serverCall.getMethodDescriptor().getFullMethodName()) - // Store peer information for client connection details - .withValue(GrpcContextKeys.PEER_INFO_KEY, getPeerInfo(serverCall)); + // Store complete metadata for full header access + .withValue(GrpcContextKeys.METADATA_KEY, metadata) + // Store Grpc method name + .withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, serverCall.getMethodDescriptor().getFullMethodName()) + // Store method name (equivalent to Python's context.method()) + .withValue(GrpcContextKeys.METHOD_NAME_KEY, GrpcContextKeys.METHOD_MAPPING.get(serverCall.getMethodDescriptor().getBareMethodName())) + // Store peer information for client connection details + .withValue(GrpcContextKeys.PEER_INFO_KEY, getPeerInfo(serverCall)); // Store A2A version if present if (version != null) { @@ -66,7 +67,7 @@ public ServerCall.Listener interceptCall( /** * Safely extracts peer information from the ServerCall. - * + * * @param serverCall the gRPC ServerCall * @return peer information string, or "unknown" if not available */ diff --git a/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java b/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java new file mode 100644 index 000000000..f67811e5d --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java @@ -0,0 +1,51 @@ +package io.a2a.server.interceptors; + +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * Represents the context of a method invocation for interceptors. + *

+ * This record captures the target object, method being invoked, and parameters + * passed to the method. It's used by interceptors to extract attributes and + * execute the actual method invocation. + *

+ * The parameters array is defensively copied to prevent external modification, + * ensuring immutability of the InvocationContext after construction. + * + * @param target the object on which the method is being invoked (never null) + * @param method the method being invoked (may be null if method resolution fails) + * @param parameters the parameters passed to the method (may be null or empty) + */ +public record InvocationContext(@NonNull Object target, @NonNull Method method, Object @Nullable[] parameters) { + + /** + * Compact constructor with validation and defensive copying. + */ + public InvocationContext { + if (target == null) { + throw new IllegalArgumentException("target cannot be null"); + } + if (method == null) { + throw new IllegalArgumentException("method cannot be null"); + } + } + + /** + * Invokes the method on the target with the provided parameters. + * + * @return the result of the method invocation + * @throws IllegalStateException if method is null + * @throws IllegalAccessException if the method is not accessible + * @throws IllegalArgumentException if parameters don't match method signature + * @throws InvocationTargetException if the invoked method throws an exception + */ + public @Nullable Object proceed() throws IllegalAccessException, + IllegalArgumentException, + InvocationTargetException { + return method.invoke(target, parameters); + } +} diff --git a/server-common/src/main/java/io/a2a/server/interceptors/Kind.java b/server-common/src/main/java/io/a2a/server/interceptors/Kind.java new file mode 100644 index 000000000..51e093908 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/interceptors/Kind.java @@ -0,0 +1,5 @@ +package io.a2a.server.interceptors; + +public enum Kind { + INTERNAL, SERVER, CLIENT, PRODUCER, CONSUMER; +} diff --git a/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java b/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java new file mode 100644 index 000000000..8e027b46b --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java @@ -0,0 +1,14 @@ +package io.a2a.server.interceptors; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +public class NoAttributeExtractor implements Supplier>> { + + @Override + public Function> get() { + return ctx -> Collections.emptyMap(); + } +} diff --git a/server-common/src/main/java/io/a2a/server/interceptors/Trace.java b/server-common/src/main/java/io/a2a/server/interceptors/Trace.java new file mode 100644 index 000000000..7cdbad288 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/interceptors/Trace.java @@ -0,0 +1,27 @@ +package io.a2a.server.interceptors; + +import jakarta.enterprise.util.Nonbinding; +import jakarta.interceptor.InterceptorBinding; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Framework-agnostic annotation for method tracing. + * Works with both Jakarta EE CDI interceptors and Spring AOP. + */ +@InterceptorBinding +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +@Inherited +public @interface Trace { + @Nonbinding + Kind kind() default Kind.SERVER; + @Nonbinding + Class>>> extractor() default NoAttributeExtractor.class; +} diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index e19f7920d..4f55c7552 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -1,5 +1,6 @@ package io.a2a.server.requesthandlers; +import static io.a2a.server.interceptors.Kind.SERVER; import static io.a2a.server.util.async.AsyncUtils.convertingProcessor; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; import static io.a2a.server.util.async.AsyncUtils.processor; @@ -37,6 +38,7 @@ import io.a2a.server.events.EventQueueItem; import io.a2a.server.events.QueueManager; import io.a2a.server.events.TaskQueueExistsException; +import io.a2a.server.interceptors.Trace; import io.a2a.server.tasks.PushNotificationConfigStore; import io.a2a.server.tasks.PushNotificationSender; import io.a2a.server.tasks.ResultAggregator; @@ -278,6 +280,7 @@ public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStor } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws A2AError { LOGGER.debug("onGetTask {}", params.id()); Task task = taskStore.get(params.id()); @@ -311,6 +314,7 @@ private static Task limitTaskHistory(Task task, @Nullable Integer historyLength) } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext context) throws A2AError { LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}, lastUpdatedAfter={}", params.contextId(), params.status(), params.pageSize(), params.pageToken(), params.lastUpdatedAfter()); @@ -333,6 +337,7 @@ public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext con } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws A2AError { Task task = taskStore.get(params.id()); if (task == null) { @@ -385,6 +390,7 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public EventKind onMessageSend(MessageSendParams params, ServerCallContext context) throws A2AError { LOGGER.debug("onMessageSend - task: {}; context {}", params.message().taskId(), params.message().contextId()); MessageSendSetup mss = initMessageSend(params, context); @@ -520,6 +526,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public Flow.Publisher onMessageSendStream( MessageSendParams params, ServerCallContext context) throws A2AError { LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}", @@ -682,6 +689,7 @@ private void startBackgroundConsumption() { } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public TaskPushNotificationConfig onSetTaskPushNotificationConfig( TaskPushNotificationConfig params, ServerCallContext context) throws A2AError { if (pushConfigStore == null) { @@ -697,6 +705,7 @@ public TaskPushNotificationConfig onSetTaskPushNotificationConfig( } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public TaskPushNotificationConfig onGetTaskPushNotificationConfig( GetTaskPushNotificationConfigParams params, ServerCallContext context) throws A2AError { if (pushConfigStore == null) { @@ -729,6 +738,7 @@ private PushNotificationConfig getPushNotificationConfig(ListTaskPushNotificatio } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public Flow.Publisher onResubscribeToTask( TaskIdParams params, ServerCallContext context) throws A2AError { LOGGER.debug("onResubscribeToTask - taskId: {}", params.id()); @@ -760,6 +770,7 @@ public Flow.Publisher onResubscribeToTask( } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public ListTaskPushNotificationConfigResult onListTaskPushNotificationConfig( ListTaskPushNotificationConfigParams params, ServerCallContext context) throws A2AError { if (pushConfigStore == null) { @@ -773,6 +784,7 @@ public ListTaskPushNotificationConfigResult onListTaskPushNotificationConfig( } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public void onDeleteTaskPushNotificationConfig( DeleteTaskPushNotificationConfigParams params, ServerCallContext context) { if (pushConfigStore == null) { diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java b/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java new file mode 100644 index 000000000..c9d289139 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java @@ -0,0 +1,65 @@ +package io.a2a.server.requesthandlers; + +import io.a2a.server.ServerCallContext; +import io.a2a.server.interceptors.InvocationContext; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class RequestHandlerAttributeExtractor implements Supplier>> { + + @Override + @SuppressWarnings("NullAway") // Null checks performed inline + public Function> get() { + return ctx -> { + if (ctx == null || ctx.method() == null) { + return Collections.emptyMap(); + } + + String method = ctx.method().getName(); + if (method == null) { + return Collections.emptyMap(); + } + + Object[] parameters = ctx.parameters(); + if (parameters == null || parameters.length < 2) { + return Collections.emptyMap(); + } + + switch (method) { + case "onMessageSend", + "onMessageSendStream", + "onCancelTask", + "onResubscribeToTask", + "getPushNotificationConfig", + "setPushNotificationConfig", + "onGetTask", + "listPushNotificationConfig", + "deletePushNotificationConfig", + "onListTasks" -> { + if (parameters[0] == null || parameters[1] == null) { + return Collections.emptyMap(); + } + + ServerCallContext context = (ServerCallContext) parameters[1]; + Map attributes = new HashMap<>(); + attributes.put("request", parameters[0].toString()); + attributes.put("extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(","))); + + String a2aMethod = (String) context.getState().get("method"); + if (a2aMethod != null) { + attributes.put("a2a.method", a2aMethod); + } + + return attributes; + } + default -> { + return Collections.emptyMap(); + } + } + }; + } +} diff --git a/spec/src/main/java/io/a2a/spec/Message.java b/spec/src/main/java/io/a2a/spec/Message.java index 4383b1e02..978af4c40 100644 --- a/spec/src/main/java/io/a2a/spec/Message.java +++ b/spec/src/main/java/io/a2a/spec/Message.java @@ -94,6 +94,11 @@ public static Builder builder(Message message) { return new Builder(message); } + @Override + public String toString() { + return "Message{" + "role=" + role + ", parts=" + parts + ", messageId=" + messageId + ", contextId=" + contextId + ", taskId=" + taskId + ", metadata=" + metadata + ", referenceTaskIds=" + referenceTaskIds + ", extensions=" + extensions + '}'; + } + /** * Defines the role of the message sender in the conversation. *

diff --git a/spec/src/main/java/io/a2a/spec/Task.java b/spec/src/main/java/io/a2a/spec/Task.java index 24336dcd9..b51db72ce 100644 --- a/spec/src/main/java/io/a2a/spec/Task.java +++ b/spec/src/main/java/io/a2a/spec/Task.java @@ -103,6 +103,11 @@ public static Builder builder(Task task) { return new Builder(task); } + @Override + public String toString() { + return "Task{" + "id=" + id + ", contextId=" + contextId + ", status=" + status + ", artifacts=" + artifacts + ", history=" + history + ", metadata=" + metadata + '}'; + } + /** * Builder for constructing immutable {@link Task} instances. *

diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java index a025236f0..2338a7446 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java @@ -1,16 +1,20 @@ package io.a2a.transport.grpc.context; + +import java.util.Map; + +import io.a2a.spec.A2AMethods; import io.grpc.Context; /** * Shared gRPC context keys for A2A protocol data. - * + * * These keys provide access to gRPC context information similar to * Python's grpc.aio.ServicerContext, enabling rich context access * in service method implementations. */ public final class GrpcContextKeys { - + /** * Context key for storing the X-A2A-Version header value. * Set by server interceptors and accessed by service handlers. @@ -36,9 +40,16 @@ public final class GrpcContextKeys { * Context key for storing the method name being called. * Equivalent to Python's context.method() functionality. */ - public static final Context.Key METHOD_NAME_KEY = + public static final Context.Key GRPC_METHOD_NAME_KEY = Context.key("grpc-method-name"); + /** + * Context key for storing the method name being called. + * Equivalent to Python's context.method() functionality. + */ + public static final Context.Key METHOD_NAME_KEY = + Context.key("method"); + /** * Context key for storing the peer information. * Provides access to client connection details. @@ -46,6 +57,18 @@ public final class GrpcContextKeys { public static final Context.Key PEER_INFO_KEY = Context.key("grpc-peer-info"); + public static final Map METHOD_MAPPING = Map.of( + "SendMessage", A2AMethods.SEND_MESSAGE_METHOD, + "SendStreamingMessage", A2AMethods.SEND_STREAMING_MESSAGE_METHOD, + "GetTask", A2AMethods.GET_TASK_METHOD, + "ListTask", A2AMethods.LIST_TASK_METHOD, + "CancelTask", A2AMethods.CANCEL_TASK_METHOD, + "SubscribeToTask", A2AMethods.SUBSCRIBE_TO_TASK_METHOD, + "CreateTaskPushNotification", A2AMethods.SET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, + "GetTaskPushNotification", A2AMethods.GET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, + "ListTaskPushNotification", A2AMethods.LIST_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, + "DeleteTaskPushNotification", A2AMethods.DELETE_TASK_PUSH_NOTIFICATION_CONFIG_METHOD); + private GrpcContextKeys() { // Utility class } diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java new file mode 100644 index 000000000..7c1211f9f --- /dev/null +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java @@ -0,0 +1,88 @@ +package io.a2a.transport.grpc.handler; + +import io.grpc.Context; +import io.a2a.server.interceptors.InvocationContext; +import io.a2a.transport.grpc.context.GrpcContextKeys; +import org.jspecify.annotations.NonNull; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Extracts OpenTelemetry trace attributes from gRPC invocation context. + *

+ * This extractor pulls relevant information from gRPC method calls to populate + * span attributes for distributed tracing. + */ +public class GrpcAttributeExtractor implements Supplier>> { + + @Override + public @NonNull Function> get() { + return ctx -> { + if (ctx == null || ctx.method() == null) { + return Collections.emptyMap(); + } + + String method = ctx.method().getName(); + if (method == null) { + return Collections.emptyMap(); + } + + Object[] parameters = ctx.parameters(); + if (parameters == null || parameters.length == 0) { + return Collections.emptyMap(); + } + + switch (method) { + case "sendMessage", + "getTask", + "listTasks", + "cancelTask", + "createTaskPushNotificationConfig", + "getTaskPushNotificationConfig", + "listTaskPushNotificationConfig", + "sendStreamingMessage", + "subscribeToTask", + "deleteTaskPushNotificationConfig" -> { + return extractAttributes(parameters); + } + default -> { + return Collections.emptyMap(); + } + } + }; + } + + /** + * Extracts trace attributes from method parameters and gRPC context. + * + * @param parameters the method invocation parameters + * @return map of attribute key-value pairs for tracing + */ + private @NonNull Map extractAttributes(Object @NonNull [] parameters) { + Context currentContext = Context.current(); + Map attributes = new HashMap<>(); + + // Add request parameter if available + if (parameters.length > 0 && parameters[0] != null) { + attributes.put("gen_ai.agent.a2a.request", parameters[0].toString()); + } + + // Add gRPC extensions header if available + String extensions = GrpcContextKeys.EXTENSIONS_HEADER_KEY.get(); + if (extensions != null) { + attributes.put("extensions", extensions); + } + + // Add gRPC operation name if available + String operationName = GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(currentContext); + if (operationName != null) { + attributes.put("gen_ai.agent.operation.name", operationName); + } + + return attributes; + } +} diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java index 2998965ea..1f8d285b2 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java @@ -2,6 +2,7 @@ import static io.a2a.grpc.utils.ProtoUtils.FromProto; import static io.a2a.grpc.utils.ProtoUtils.ToProto; +import static io.a2a.server.interceptors.Kind.SERVER; import java.util.HashMap; import java.util.HashSet; @@ -26,6 +27,7 @@ import io.a2a.server.auth.UnauthenticatedUser; import io.a2a.server.auth.User; import io.a2a.server.extensions.A2AExtensions; +import io.a2a.server.interceptors.Trace; import io.a2a.server.requesthandlers.RequestHandler; import io.a2a.server.version.A2AVersionValidator; import io.a2a.spec.A2AError; @@ -59,13 +61,14 @@ import io.grpc.Context; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import org.jspecify.annotations.Nullable; @Vetoed public abstract class GrpcHandler extends A2AServiceGrpc.A2AServiceImplBase { // Hook so testing can wait until streaming subscriptions are established. // Without this we get intermittent failures - private static volatile Runnable streamingSubscribedRunnable; + private static volatile @Nullable Runnable streamingSubscribedRunnable; private final AtomicBoolean initialised = new AtomicBoolean(false); @@ -76,6 +79,7 @@ public GrpcHandler() { } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void sendMessage(io.a2a.grpc.SendMessageRequest request, StreamObserver responseObserver) { try { @@ -97,6 +101,7 @@ public void sendMessage(io.a2a.grpc.SendMessageRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void getTask(io.a2a.grpc.GetTaskRequest request, StreamObserver responseObserver) { try { @@ -119,6 +124,7 @@ public void getTask(io.a2a.grpc.GetTaskRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void listTasks(io.a2a.grpc.ListTasksRequest request, StreamObserver responseObserver) { try { @@ -137,6 +143,7 @@ public void listTasks(io.a2a.grpc.ListTasksRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void cancelTask(io.a2a.grpc.CancelTaskRequest request, StreamObserver responseObserver) { try { @@ -159,6 +166,7 @@ public void cancelTask(io.a2a.grpc.CancelTaskRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void setTaskPushNotificationConfig(io.a2a.grpc.SetTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().pushNotifications()) { @@ -182,6 +190,7 @@ public void setTaskPushNotificationConfig(io.a2a.grpc.SetTaskPushNotificationCon } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().pushNotifications()) { @@ -205,6 +214,7 @@ public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationCon } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().pushNotifications()) { @@ -229,6 +239,7 @@ public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationC } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().streaming()) { @@ -253,6 +264,7 @@ public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void subscribeToTask(io.a2a.grpc.SubscribeToTaskRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().streaming()) { @@ -278,12 +290,14 @@ private void convertToStreamResponse(Flow.Publisher publishe StreamObserver responseObserver) { CompletableFuture.runAsync(() -> { publisher.subscribe(new Flow.Subscriber() { - private Flow.Subscription subscription; + private Flow.@Nullable Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; - subscription.request(1); + if (this.subscription != null) { + this.subscription.request(1); + } // Notify tests that we are subscribed Runnable runnable = streamingSubscribedRunnable; @@ -299,7 +313,9 @@ public void onNext(StreamingEventKind event) { if (response.hasStatusUpdate() && response.getStatusUpdate().getFinal()) { responseObserver.onCompleted(); } else { - subscription.request(1); + if (this.subscription != null) { + this.subscription.request(1); + } } } @@ -333,6 +349,7 @@ public void getExtendedAgentCard(io.a2a.grpc.GetExtendedAgentCardRequest request } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void deleteTaskPushNotificationConfig(io.a2a.grpc.DeleteTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().pushNotifications()) { @@ -383,7 +400,7 @@ private ServerCallContext createCallContext(StreamObserver responseObserv state.put("grpc_metadata", grpcMetadata); } - String methodName = GrpcContextKeys.METHOD_NAME_KEY.get(currentContext); + String methodName = GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(currentContext); if (methodName != null) { state.put("grpc_method_name", methodName); } @@ -549,7 +566,7 @@ public static void setStreamingSubscribedRunnable(Runnable runnable) { * * @return the version header value, or null if not available */ - private String getVersionFromContext() { + private @Nullable String getVersionFromContext() { try { return GrpcContextKeys.VERSION_HEADER_KEY.get(); } catch (Exception e) { @@ -565,7 +582,7 @@ private String getVersionFromContext() { * * @return the extensions header value, or null if not available */ - private String getExtensionsFromContext() { + private @Nullable String getExtensionsFromContext() { try { return GrpcContextKeys.EXTENSIONS_HEADER_KEY.get(); } catch (Exception e) { @@ -585,7 +602,7 @@ private String getExtensionsFromContext() { * @param key the context key to retrieve * @return the context value, or null if not available */ - private static T getFromContext(Context.Key key) { + private static @Nullable T getFromContext(Context.Key key) { try { return key.get(); } catch (Exception e) { @@ -600,7 +617,7 @@ private static T getFromContext(Context.Key key) { * * @return the gRPC Metadata object, or null if not available */ - protected static io.grpc.Metadata getCurrentMetadata() { + protected static io.grpc.@Nullable Metadata getCurrentMetadata() { return getFromContext(GrpcContextKeys.METADATA_KEY); } @@ -610,8 +627,8 @@ protected static io.grpc.Metadata getCurrentMetadata() { * * @return the method name, or null if not available */ - protected static String getCurrentMethodName() { - return getFromContext(GrpcContextKeys.METHOD_NAME_KEY); + protected static @Nullable String getCurrentMethodName() { + return getFromContext(GrpcContextKeys.GRPC_METHOD_NAME_KEY); } /** @@ -620,7 +637,7 @@ protected static String getCurrentMethodName() { * * @return the peer information, or null if not available */ - protected static String getCurrentPeerInfo() { + protected static @Nullable String getCurrentPeerInfo() { return getFromContext(GrpcContextKeys.PEER_INFO_KEY); } } diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java new file mode 100644 index 000000000..0cc667b2d --- /dev/null +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java @@ -0,0 +1,9 @@ +/* + * Copyright The WildFly Authors + * SPDX-License-Identifier: Apache-2.0 + */ +@NullMarked +package io.a2a.transport.grpc.handler; + +import org.jspecify.annotations.NullMarked; + diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractorTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractorTest.java new file mode 100644 index 000000000..8b88eeecc --- /dev/null +++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractorTest.java @@ -0,0 +1,235 @@ +package io.a2a.transport.grpc.handler; + +import io.a2a.server.interceptors.InvocationContext; +import io.a2a.transport.grpc.context.GrpcContextKeys; +import io.grpc.Context; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Method; +import java.util.Map; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.*; + +class GrpcAttributeExtractorTest { + + private GrpcAttributeExtractor extractor; + private Context previousContext; + + @BeforeEach + void setUp() { + extractor = new GrpcAttributeExtractor(); + // Save the current context to restore it later + previousContext = Context.current(); + } + + @AfterEach + void tearDown() { + // Restore the previous context + if (previousContext != null) { + previousContext.attach(); + } + } + + @Test + void testExtractAttributes_SendMessage_Success() throws Exception { + // Create a mock method + Method method = TestService.class.getMethod("sendMessage", Object.class); + Object[] parameters = new Object[]{"test-request"}; + + // Set up gRPC context with test values + Context ctx = Context.current() + .withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, "a2a.SendMessage") + .withValue(GrpcContextKeys.EXTENSIONS_HEADER_KEY, "ext1,ext2"); + + Context previous = ctx.attach(); + try { + InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters); + Function> function = extractor.get(); + Map attributes = function.apply(invocationCtx); + + assertNotNull(attributes); + assertEquals(3, attributes.size()); + assertEquals("test-request", attributes.get("gen_ai.agent.a2a.request")); + assertEquals("ext1,ext2", attributes.get("extensions")); + assertEquals("a2a.SendMessage", attributes.get("gen_ai.agent.operation.name")); + } finally { + ctx.detach(previous); + } + } + + @Test + void testExtractAttributes_NullContext() { + Function> function = extractor.get(); + Map attributes = function.apply(null); + + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + + @Test + void testExtractAttributes_NullMethod() throws Exception { + Object[] parameters = new Object[]{"test-request"}; + assertThrows(IllegalArgumentException.class, () -> new InvocationContext(new TestService(), null, parameters)); + } + + @Test + void testExtractAttributes_NullParameters() throws Exception { + Method method = TestService.class.getMethod("sendMessage", Object.class); + InvocationContext invocationCtx = new InvocationContext(new TestService(), method, null); + + Function> function = extractor.get(); + Map attributes = function.apply(invocationCtx); + + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + + @Test + void testExtractAttributes_EmptyParameters() throws Exception { + Method method = TestService.class.getMethod("sendMessage", Object.class); + Object[] parameters = new Object[]{}; + InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters); + + Function> function = extractor.get(); + Map attributes = function.apply(invocationCtx); + + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + + @Test + void testExtractAttributes_NullParameter() throws Exception { + Method method = TestService.class.getMethod("sendMessage", Object.class); + Object[] parameters = new Object[]{null}; + + Context ctx = Context.current() + .withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, "a2a.SendMessage"); + + Context previous = ctx.attach(); + try { + InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters); + Function> function = extractor.get(); + Map attributes = function.apply(invocationCtx); + + assertNotNull(attributes); + // Should have operation name but not request (since parameter is null) + assertEquals(1, attributes.size()); + assertEquals("a2a.SendMessage", attributes.get("gen_ai.agent.operation.name")); + } finally { + ctx.detach(previous); + } + } + + @Test + void testExtractAttributes_NoGrpcContextKeys() throws Exception { + Method method = TestService.class.getMethod("sendMessage", Object.class); + Object[] parameters = new Object[]{"test-request"}; + + // Don't set any context values + InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters); + Function> function = extractor.get(); + Map attributes = function.apply(invocationCtx); + + assertNotNull(attributes); + assertEquals(1, attributes.size()); + assertEquals("test-request", attributes.get("gen_ai.agent.a2a.request")); + assertFalse(attributes.containsKey("extensions")); + assertFalse(attributes.containsKey("gen_ai.agent.operation.name")); + } + + @Test + void testExtractAttributes_UnknownMethod() throws Exception { + Method method = TestService.class.getMethod("unknownMethod"); + Object[] parameters = new Object[]{}; + + InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters); + Function> function = extractor.get(); + Map attributes = function.apply(invocationCtx); + + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + + @Test + void testExtractAttributes_AllSupportedMethods() throws Exception { + String[] supportedMethods = { + "sendMessage", + "getTask", + "listTasks", + "cancelTask", + "createTaskPushNotificationConfig", + "getTaskPushNotificationConfig", + "listTaskPushNotificationConfig", + "sendStreamingMessage", + "subscribeToTask", + "deleteTaskPushNotificationConfig" + }; + + for (String methodName : supportedMethods) { + Method method = TestService.class.getMethod(methodName, Object.class); + Object[] parameters = new Object[]{"request-" + methodName}; + + Context ctx = Context.current() + .withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, "a2a." + methodName) + .withValue(GrpcContextKeys.EXTENSIONS_HEADER_KEY, "ext1"); + + Context previous = ctx.attach(); + try { + InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters); + Function> function = extractor.get(); + Map attributes = function.apply(invocationCtx); + + assertNotNull(attributes, "Attributes should not be null for method: " + methodName); + assertFalse(attributes.isEmpty(), "Attributes should not be empty for method: " + methodName); + assertEquals("request-" + methodName, attributes.get("gen_ai.agent.a2a.request"), + "Request attribute should match for method: " + methodName); + assertEquals("ext1", attributes.get("extensions"), + "Extensions should match for method: " + methodName); + assertEquals("a2a." + methodName, attributes.get("gen_ai.agent.operation.name"), + "Operation name should match for method: " + methodName); + } finally { + ctx.detach(previous); + } + } + } + + @Test + void testExtractAttributes_OnlyExtensions() throws Exception { + Method method = TestService.class.getMethod("sendMessage", Object.class); + Object[] parameters = new Object[]{null}; + + Context ctx = Context.current() + .withValue(GrpcContextKeys.EXTENSIONS_HEADER_KEY, "ext1,ext2,ext3"); + + Context previous = ctx.attach(); + try { + InvocationContext invocationCtx = new InvocationContext(new TestService(), method, parameters); + Function> function = extractor.get(); + Map attributes = function.apply(invocationCtx); + + assertNotNull(attributes); + assertEquals(1, attributes.size()); + assertEquals("ext1,ext2,ext3", attributes.get("extensions")); + } finally { + ctx.detach(previous); + } + } + + // Test service class with all supported methods + public static class TestService { + public void sendMessage(Object request) {} + public void getTask(Object request) {} + public void listTasks(Object request) {} + public void cancelTask(Object request) {} + public void createTaskPushNotificationConfig(Object request) {} + public void getTaskPushNotificationConfig(Object request) {} + public void listTaskPushNotificationConfig(Object request) {} + public void sendStreamingMessage(Object request) {} + public void subscribeToTask(Object request) {} + public void deleteTaskPushNotificationConfig(Object request) {} + public void unknownMethod() {} + } +} diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCAttributeExtractor.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCAttributeExtractor.java new file mode 100644 index 000000000..485f8adba --- /dev/null +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCAttributeExtractor.java @@ -0,0 +1,53 @@ +package io.a2a.transport.jsonrpc.handler; + +import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY; + +import io.a2a.server.ServerCallContext; +import io.a2a.server.interceptors.InvocationContext; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class JSONRPCAttributeExtractor implements Supplier>> { + + @Override + public Function> get() { + return ctx -> { + String method = ctx.method().getName(); + Object[] parameters = ctx.parameters() == null ? new Object[]{} : ctx.parameters(); + if( ctx.parameters() == null || ctx.parameters().length < 2) { + throw new IllegalArgumentException("wrong parameters passed"); + } + switch (method) { + case "onMessageSend", + "onMessageSendStream", + "onCancelTask", + "onResubscribeToTask", + "getPushNotificationConfig", + "setPushNotificationConfig", + "onGetTask", + "listPushNotificationConfig", + "deletePushNotificationConfig", + "onListTasks" -> { + ServerCallContext context = (ServerCallContext) parameters[1]; + Map attributes = new HashMap<>(); + attributes.put("gen_ai.agent.a2a.request", parameters[0].toString()); + attributes.put("gen_ai.agent.a2a.extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(","))); + + String operationName = (String) context.getState().get(METHOD_NAME_KEY); + if (operationName != null) { + attributes.put("gen_ai.agent.operation.name", operationName); + } + + return attributes; + } + default -> { + return Collections.emptyMap(); + } + } + }; + } +} diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java index ca8149099..72ad40acc 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java @@ -1,5 +1,6 @@ package io.a2a.transport.jsonrpc.handler; +import static io.a2a.server.interceptors.Kind.SERVER; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; import java.util.concurrent.CompletableFuture; @@ -37,13 +38,13 @@ import io.a2a.server.PublicAgentCard; import io.a2a.server.ServerCallContext; import io.a2a.server.extensions.A2AExtensions; +import io.a2a.server.interceptors.Trace; import io.a2a.server.requesthandlers.RequestHandler; import io.a2a.server.util.async.Internal; import io.a2a.server.version.A2AVersionValidator; import io.a2a.spec.A2AError; import io.a2a.spec.AgentCard; import io.a2a.spec.ExtendedCardNotConfiguredError; -import io.a2a.spec.ExtensionSupportRequiredError; import io.a2a.spec.EventKind; import io.a2a.spec.InternalError; import io.a2a.spec.InvalidRequestError; @@ -53,7 +54,6 @@ import io.a2a.spec.Task; import io.a2a.spec.TaskNotFoundError; import io.a2a.spec.TaskPushNotificationConfig; -import io.a2a.spec.VersionNotSupportedError; import mutiny.zero.ZeroPublisher; import org.jspecify.annotations.Nullable; @@ -98,6 +98,7 @@ public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, RequestHandler reque this(agentCard, null, requestHandler, executor); } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallContext context) { try { A2AVersionValidator.validateProtocolVersion(agentCard, context); @@ -111,7 +112,7 @@ public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallC } } - + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public Flow.Publisher onMessageSendStream( SendStreamingMessageRequest request, ServerCallContext context) { if (!agentCard.capabilities().streaming()) { @@ -136,6 +137,7 @@ public Flow.Publisher onMessageSendStream( } } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public CancelTaskResponse onCancelTask(CancelTaskRequest request, ServerCallContext context) { try { Task task = requestHandler.onCancelTask(request.getParams(), context); @@ -150,6 +152,7 @@ public CancelTaskResponse onCancelTask(CancelTaskRequest request, ServerCallCont } } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public Flow.Publisher onSubscribeToTask( SubscribeToTaskRequest request, ServerCallContext context) { if (!agentCard.capabilities().streaming()) { @@ -158,7 +161,6 @@ public Flow.Publisher onSubscribeToTask( request.getId(), new InvalidRequestError("Streaming is not supported by the agent"))); } - try { Flow.Publisher publisher = requestHandler.onResubscribeToTask(request.getParams(), context); @@ -172,6 +174,7 @@ public Flow.Publisher onSubscribeToTask( } } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public GetTaskPushNotificationConfigResponse getPushNotificationConfig( GetTaskPushNotificationConfigRequest request, ServerCallContext context) { if (!agentCard.capabilities().pushNotifications()) { @@ -189,6 +192,7 @@ public GetTaskPushNotificationConfigResponse getPushNotificationConfig( } } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public SetTaskPushNotificationConfigResponse setPushNotificationConfig( SetTaskPushNotificationConfigRequest request, ServerCallContext context) { if (!agentCard.capabilities().pushNotifications()) { @@ -206,6 +210,7 @@ public SetTaskPushNotificationConfigResponse setPushNotificationConfig( } } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public GetTaskResponse onGetTask(GetTaskRequest request, ServerCallContext context) { try { Task task = requestHandler.onGetTask(request.getParams(), context); @@ -217,6 +222,7 @@ public GetTaskResponse onGetTask(GetTaskRequest request, ServerCallContext conte } } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public ListTasksResponse onListTasks(ListTasksRequest request, ServerCallContext context) { try { ListTasksResult result = requestHandler.onListTasks(request.getParams(), context); @@ -228,6 +234,7 @@ public ListTasksResponse onListTasks(ListTasksRequest request, ServerCallContext } } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public ListTaskPushNotificationConfigResponse listPushNotificationConfig( ListTaskPushNotificationConfigRequest request, ServerCallContext context) { if ( !agentCard.capabilities().pushNotifications()) { @@ -245,6 +252,7 @@ public ListTaskPushNotificationConfigResponse listPushNotificationConfig( } } + @Trace(extractor=JSONRPCAttributeExtractor.class, kind = SERVER) public DeleteTaskPushNotificationConfigResponse deletePushNotificationConfig( DeleteTaskPushNotificationConfigRequest request, ServerCallContext context) { if ( !agentCard.capabilities().pushNotifications()) { diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java new file mode 100644 index 000000000..03427fdd6 --- /dev/null +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java @@ -0,0 +1,100 @@ +package io.a2a.transport.rest.handler; + +import static io.a2a.transport.rest.context.RestContextKeys.METHOD_NAME_KEY; + +import io.a2a.server.ServerCallContext; +import io.a2a.server.interceptors.InvocationContext; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.jspecify.annotations.Nullable; + +public class RestAttributeExtractor implements Supplier>> { + + @Override + public Function> get() { + return ctx -> { + String method = ctx.method().getName(); + Object[] parameters = ctx.parameters() == null ? new Object[]{} : ctx.parameters(); + if( ctx.parameters() == null || ctx.parameters().length < 2) { + throw new IllegalArgumentException("wrong parameters passed"); + } + switch (method) { + case "sendMessage", + "sendStreamingMessage"-> { + ServerCallContext context = (ServerCallContext) parameters[2]; + Map result = new HashMap<>(); + putIfNotNull(result, "gen_ai.agent.a2a.request", (String) parameters[0]); + result.putAll(processServerCallContext(context)); + return result; + } + case "setTaskPushNotificationConfiguration" -> { + ServerCallContext context = (ServerCallContext) parameters[3]; + Map result = new HashMap<>(); + putIfNotNull(result, "gen_ai.agent.a2a.taskId", (String) parameters[0]); + putIfNotNull(result, "gen_ai.agent.a2a.request", (String) parameters[1]); + result.putAll(processServerCallContext(context)); + return result; + } + case "cancelTask", + "subscribeToTask", + "listTaskPushNotificationConfigurations" -> { + ServerCallContext context = (ServerCallContext) parameters[2]; + Map result = new HashMap<>(); + putIfNotNull(result, "gen_ai.agent.a2a.taskId", (String) parameters[0]); + result.putAll(processServerCallContext(context)); + return result; + } + case "getTask" -> { + ServerCallContext context = (ServerCallContext) parameters[3]; + Map result = new HashMap<>(); + putIfNotNull(result, "gen_ai.agent.a2a.taskId", (String) parameters[0]); + putIfNotNull(result, "gen_ai.agent.a2a.historyLength", "" + (int) parameters[1]); + result.putAll(processServerCallContext(context)); + return result; + } + case "getTaskPushNotificationConfiguration", + "deleteTaskPushNotificationConfiguration" -> { + ServerCallContext context = (ServerCallContext) parameters[3]; + Map result = new HashMap<>(); + putIfNotNull(result, "gen_ai.agent.a2a.taskId", (String) parameters[0]); + putIfNotNull(result, "gen_ai.agent.a2a.configId", (String) parameters[1]); + result.putAll(processServerCallContext(context)); + return result; + } + case "listTasks" -> { + ServerCallContext context = (ServerCallContext) parameters[7]; + Map result = new HashMap<>(); + putIfNotNull(result, "gen_ai.agent.a2a.contextId", (String) parameters[0]); + putIfNotNull(result, "gen_ai.agent.a2a.status", (String) parameters[1]); + result.putAll(processServerCallContext(context)); + return result; + } + default -> { + return Collections.emptyMap(); + } + } + }; + } + + private Map processServerCallContext(ServerCallContext context) { + Map attributes = new HashMap<>(); + attributes.put("gen_ai.agent.a2a.extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(","))); + + String operationName = (String)context.getState().get(METHOD_NAME_KEY); + if (operationName != null) { + attributes.put("gen_ai.agent.a2a.operation.name", operationName); + } + + return attributes; + } + + private void putIfNotNull(Map map, String key, @Nullable String value) { + if (value != null) { + map.put(key, value); + } + } +} diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java index 489bf909b..70a603476 100644 --- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java @@ -1,5 +1,6 @@ package io.a2a.transport.rest.handler; +import static io.a2a.server.interceptors.Kind.SERVER; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; import static io.a2a.spec.A2AErrorCodes.JSON_PARSE_ERROR_CODE; @@ -29,6 +30,7 @@ import io.a2a.server.PublicAgentCard; import io.a2a.server.ServerCallContext; import io.a2a.server.extensions.A2AExtensions; +import io.a2a.server.interceptors.Trace; import io.a2a.server.requesthandlers.RequestHandler; import io.a2a.server.version.A2AVersionValidator; import io.a2a.server.util.async.Internal; @@ -105,7 +107,9 @@ public RestHandler(AgentCard agentCard, RequestHandler requestHandler, Executor this.executor = executor; } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse sendMessage(String body, String tenant, ServerCallContext context) { + try { A2AVersionValidator.validateProtocolVersion(agentCard, context); A2AExtensions.validateRequiredExtensions(agentCard, context); @@ -121,6 +125,7 @@ public HTTPRestResponse sendMessage(String body, String tenant, ServerCallContex } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse sendStreamingMessage(String body, String tenant, ServerCallContext context) { try { if (!agentCard.capabilities().streaming()) { @@ -140,6 +145,7 @@ public HTTPRestResponse sendStreamingMessage(String body, String tenant, ServerC } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse cancelTask(String taskId, String tenant, ServerCallContext context) { try { if (taskId == null || taskId.isEmpty()) { @@ -158,6 +164,7 @@ public HTTPRestResponse cancelTask(String taskId, String tenant, ServerCallConte } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse setTaskPushNotificationConfiguration(String taskId, String body, String tenant, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) { @@ -175,6 +182,7 @@ public HTTPRestResponse setTaskPushNotificationConfiguration(String taskId, Stri } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse subscribeToTask(String taskId, String tenant, ServerCallContext context) { try { if (!agentCard.capabilities().streaming()) { @@ -190,6 +198,7 @@ public HTTPRestResponse subscribeToTask(String taskId, String tenant, ServerCall } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse getTask(String taskId, @Nullable Integer historyLength, String tenant, ServerCallContext context) { try { TaskQueryParams params = new TaskQueryParams(taskId, historyLength, tenant); @@ -205,6 +214,7 @@ public HTTPRestResponse getTask(String taskId, @Nullable Integer historyLength, } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String status, @Nullable Integer pageSize, @Nullable String pageToken, @Nullable Integer historyLength, @Nullable String lastUpdatedAfter, @@ -253,6 +263,7 @@ public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String s } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse getTaskPushNotificationConfiguration(String taskId, @Nullable String configId, String tenant, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) { @@ -268,6 +279,7 @@ public HTTPRestResponse getTaskPushNotificationConfiguration(String taskId, @Nul } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse listTaskPushNotificationConfigurations(String taskId, int pageSize, String pageToken, String tenant, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) { @@ -283,6 +295,7 @@ public HTTPRestResponse listTaskPushNotificationConfigurations(String taskId, in } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse deleteTaskPushNotificationConfiguration(String taskId, String configId, String tenant, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) {