feat: add ChipIngress batch emitter support#21327
Conversation
|
👋 thomaska, thanks for creating this pull request! To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team. Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks! |
|
✅ No conflicts with other open PRs targeting |
There was a problem hiding this comment.
Pull request overview
Adds PublishBatch support to the chip-testsink gRPC server so CRE/system tests don’t fail with UNIMPLEMENTED when nodes emit batched ChIP ingress events.
Changes:
- Implement
PublishBatchon the chip-testsinkChipIngressServer. - Delegate batch handling to the existing
Publishflow (including configuredPublishFuncand optional upstream forwarding).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| for _, event := range batch.Events { | ||
| if _, err := s.Publish(ctx, event); err != nil { |
There was a problem hiding this comment.
Calling s.Publish() inside the batch loop triggers the per-event async upstream forwarding goroutine in Publish(). For large batches this can create a burst of goroutines and N upstream RPCs. Consider handling upstream forwarding in PublishBatch with a single PublishBatch call (or at least a bounded worker/pool), and calling the configured PublishFunc directly for local handling to avoid unbounded goroutine/RPC fan-out per batch.
| if _, err := s.Publish(ctx, event); err != nil { | |
| // Forward upstream synchronously to avoid spawning a goroutine per event. | |
| if s.cfg.UpstreamEndpoint != "" { | |
| forwardCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) | |
| _, err := s.upstream.Publish(forwardCtx, event) | |
| cancelFn() | |
| if err != nil { | |
| log.Printf("failed to forward to upstream: %v", err) | |
| } | |
| } | |
| if _, err := s.cfg.PublishFunc(ctx, event); err != nil { |
| // It delegates each event in the batch to the configured PublishFunc, | ||
| // mirroring how the real ChIP Ingress processes batches atomically. |
There was a problem hiding this comment.
The doc comment claims this mirrors how the real ChIP ingress processes batches "atomically", but this implementation is not atomic: it publishes events one-by-one and can return an error after earlier events have already been accepted/forwarded. Please either adjust the comment to reflect best-effort sequential processing, or change the implementation to provide the atomicity guarantees being documented.
| // It delegates each event in the batch to the configured PublishFunc, | |
| // mirroring how the real ChIP Ingress processes batches atomically. | |
| // It delegates each event in the batch to the configured PublishFunc | |
| // sequentially, returning an error on the first failure. Earlier events | |
| // in the batch may already have been published or forwarded when an error | |
| // is returned, so processing is best-effort rather than atomic. |
|
I see you updated files related to
|
# Conflicts: # core/scripts/go.mod # core/scripts/go.sum # deployment/go.mod # deployment/go.sum # go.mod # go.sum # integration-tests/go.mod # integration-tests/go.sum # integration-tests/load/go.sum # system-tests/lib/go.mod # system-tests/lib/go.sum # system-tests/tests/go.sum
|
Based on my thorough review of both PRs, here is a concrete implementation plan for registering Implementation PlanProblemThe
Step 1: Expose the batch emitter from
|
| Concern | How it's addressed |
|---|---|
| Initialization order | initGlobals → beholder.NewGRPCClient runs in beforeNode() before NewApplication, so beholder.GetClient() is already populated |
| Double-start | services.Engine's Start() is idempotent — the service framework calling Start() again is a no-op since it's already running |
| Nil safety | When ChipIngressBatchEmitterEnabled = false (default), batchEmitterService remains nil, and both nil checks protect against it |
| Shutdown order | Services in srvcs are stopped in reverse order. The emitter is closed before beholder's own Client.Close() (which closes the gRPC connection), matching the drain-before-disconnect requirement already implemented in PR #1862's reordered Client.Close() |
| Health checks | ChipIngressBatchEmitter implements services.Service via services.Engine, exposing Name(), Ready(), HealthReport() — all needed for /health |
Files summary
| File | Repo | Change |
|---|---|---|
pkg/beholder/client.go |
chainlink-common (PR #1862) |
Add BatchEmitter field to Client, store it in constructor, add GetChipIngressBatchEmitter() getter |
core/services/chainlink/application.go |
chainlink (PR #21327) |
After telemetryManager, retrieve and append batch emitter to srvcs |
Note: The PR #21327 file list may be incomplete (API results limited to 30 files). You can view the full file list here.
…ingress-publishBatch
# Conflicts: # core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/cron-based/go.mod # core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/cron-based/go.sum # core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/web-trigger-based/go.mod # core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/web-trigger-based/go.sum # core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.mod # core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.sum # core/scripts/go.mod # core/scripts/go.sum # deployment/go.mod # deployment/go.sum # devenv/go.mod # devenv/go.sum # go.mod # go.sum # integration-tests/go.mod # integration-tests/go.sum # integration-tests/load/go.mod # integration-tests/load/go.sum # system-tests/lib/go.mod # system-tests/lib/go.sum # system-tests/tests/canaries_sentinels/proof-of-reserve/cron-based/go.mod # system-tests/tests/canaries_sentinels/proof-of-reserve/cron-based/go.sum # system-tests/tests/go.mod # system-tests/tests/go.sum
|




Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436
Summary
ChipIngressBatchEmitterEnabledtelemetry config flag (defaultfalse)to toggle batch mode per-node without a code change
PublishBatchon the chip-testsink gRPC server so CRE system testswork when batch mode is enabled
chainlink-commonto include batch-emitter feature-flag supportDetail
Config flag – new
ChipIngressBatchEmitterEnabledboolean in[Telemetry].Wired through the config interface, TOML types, beholder globals, docs, and
test fixtures.
chip-testsink – the test-helper server only had single-event
Publish,inheriting
UNIMPLEMENTEDforPublishBatch. Now delegates each batch event tothe configured
PublishFuncand forwards the full batch upstream in one RPC.Why
9 CRE system tests depend on chip-testsink. Without this change they get gRPC
UNIMPLEMENTEDerrors once batch mode is the default.Requires
smartcontractkit/chainlink-common#1862