Skip to content

Chipingress publish batch#1862

Open
thomaska wants to merge 29 commits intomainfrom
infoplat-3436-chipingress-publishBatch
Open

Chipingress publish batch#1862
thomaska wants to merge 29 commits intomainfrom
infoplat-3436-chipingress-publishBatch

Conversation

@thomaska
Copy link

@thomaska thomaska commented Feb 27, 2026

Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436

Summary

  • Introduces ChipIngressBatchEmitter, a new batched event delivery path that
    replaces per-event Publish gRPC calls with buffered PublishBatch calls.
    Each unique (domain, entity) pair gets its own batch.Client worker with
    independent buffer, flush interval, and concurrency scaling.
  • Gated behind ChipIngressBatchEmitterEnabled feature flag (default: false).
    When disabled, the existing per-event ChipIngressEmitter is used unchanged.
  • Fixes shutdown ordering: emitter is now closed before the gRPC connection so
    the batch emitter can drain buffered events while the transport is still alive.
  • Fixes a bug in batch.Client.Stop() where the shutdown timeout context was
    immediately cancelled by close(stopCh) via CtxWithTimeout, preventing
    graceful drain.
  • Adds EmitWithCallback for callers that need delivery confirmation.
  • Exposes batch emitter configuration via beholder.Config and loop.EnvConfig
    (buffer size, batch size, max workers, send interval/timeout, drain timeout,
    max concurrent sends).
  • Requires the application-level logger (ChipIngressLogger) to be provided
    when the batch emitter is enabled.

Test plan

  • Unit tests for all emitter behaviors (batch assembly, max batch size, per-domain/entity isolation, buffer full drops, lifecycle start/close, callback contract)
  • Unit tests for batch client shutdown fix
  • E2E tests in atlas chip-e2e (separate PR, depends on this merge)

Supports

smartcontractkit/chainlink#21327

Copilot AI review requested due to automatic review settings February 27, 2026 14:43
@thomaska thomaska requested a review from a team as a code owner February 27, 2026 14:43
@github-actions
Copy link

👋 thomaska, thanks for creating this pull request!

To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team.

Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request replaces the per-event ChIP Ingress emission with a batched approach to reduce overhead from N gRPC calls + N Kafka transactions to 1 call + 1 transaction per flush interval. The implementation introduces a new ChipIngressBatchEmitter that buffers events per (domain, entity) pair and flushes them periodically using PublishBatch.

Changes:

  • Introduced ChipIngressBatchEmitter with per-(domain, entity) worker goroutines for batching events
  • Added chipIngressEmitterWorker to handle batch assembly and sending with configurable timeouts
  • Removed goroutine wrapper from DualSourceEmitter.Emit() since batching is now non-blocking (channel send)
  • Added 4 new configuration parameters with sensible defaults (BufferSize: 100, MaxBatchSize: 50, SendInterval: 500ms, SendTimeout: 10s)

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
pkg/beholder/chip_ingress_batch_emitter.go New batch emitter with per-worker buffering and periodic flushing via PublishBatch
pkg/beholder/chip_ingress_emitter_worker.go Worker implementation handling batch assembly, channel draining, and exponential backoff logging for drops
pkg/beholder/chip_ingress_batch_emitter_test.go Comprehensive test coverage (10 tests) for batching, max batch size, isolation, buffer overflow, lifecycle, errors, and defaults
pkg/beholder/dual_source_emitter.go Simplified Emit() by removing goroutine wrapper since ChipIngressBatchEmitter.Emit() is non-blocking
pkg/beholder/client.go Updated to create and start ChipIngressBatchEmitter instead of ChipIngressEmitter; added comment about closure ordering
pkg/beholder/config.go Added 4 new config fields with inline documentation and default values
pkg/beholder/config_test.go Updated expected output to include new config fields
Comments suppressed due to low confidence (2)

pkg/beholder/config.go:50

  • The comment states "Zero disables batching" but the implementation in NewChipIngressBatchEmitter treats zero as "use default" and sets it to 500ms. The comment should be corrected to match the actual behavior, e.g., "Flush interval per worker (default 500ms when zero or unset)".
	ChipIngressSendInterval time.Duration // Flush interval per worker (default 500ms). Zero disables batching.

pkg/beholder/client.go:248

  • The messageLoggerProvider appears twice in the shutdowner slice. This will cause it to be shut down twice, which could lead to errors or undefined behavior. Remove one of the duplicate entries.
		for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} {

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions
Copy link

github-actions bot commented Feb 27, 2026

⚠️ API Diff Results - github.com/smartcontractkit/chainlink-common

⚠️ Breaking Changes (1)

pkg/beholder (1)
  • NewDualSourceEmitter — Type changed:
func(
  Emitter, 
  Emitter, 
  + bool
)
(Emitter, error)

✅ Compatible Changes (20)

pkg/beholder (2)
  • ChipIngressBatchEmitterService — ➕ Added

  • NewChipIngressBatchEmitterService — ➕ Added

pkg/beholder.(*Client) (1)
  • ManagedServices — ➕ Added
pkg/beholder.Config (8)
  • ChipIngressBatchEmitterEnabled — ➕ Added

  • ChipIngressBufferSize — ➕ Added

  • ChipIngressDrainTimeout — ➕ Added

  • ChipIngressLogger — ➕ Added

  • ChipIngressMaxBatchSize — ➕ Added

  • ChipIngressMaxConcurrentSends — ➕ Added

  • ChipIngressSendInterval — ➕ Added

  • ChipIngressSendTimeout — ➕ Added

pkg/beholder.writerClientConfig (8)
  • ChipIngressBatchEmitterEnabled — ➕ Added

  • ChipIngressBufferSize — ➕ Added

  • ChipIngressDrainTimeout — ➕ Added

  • ChipIngressLogger — ➕ Added

  • ChipIngressMaxBatchSize — ➕ Added

  • ChipIngressMaxConcurrentSends — ➕ Added

  • ChipIngressSendInterval — ➕ Added

  • ChipIngressSendTimeout — ➕ Added

pkg/loop.EnvConfig (1)
  • ChipIngressBatchEmitterEnabled — ➕ Added

📄 View full apidiff report

@smartcontractkit smartcontractkit deleted a comment from github-actions bot Feb 27, 2026
return nil, err
}

chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a feature flag

// via chipingress.Client.PublishBatch on a periodic interval.
// It satisfies the Emitter interface so it can be used as a drop-in replacement
// for ChipIngressEmitter.
type ChipIngressBatchEmitter struct {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name it Service

return e, nil
}

func (e *ChipIngressBatchEmitter) start(_ context.Context) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the role of this function if it always returns null?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was mostly added as a placeholder, but can be omitted as well.
And after checking, in the core/services/workflows/syncer/v2/handler.go in EventHandler it's also omitted, so. probably it's more consistent.


// NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client.
// Call Start() to begin health monitoring, and Close() to stop all workers.
func NewChipIngressBatchEmitter(client chipingress.Client, lggr logger.Logger, cfg Config) (*ChipIngressBatchEmitter, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pure stylistic, and feel free to ignore it, make the logger the last param and after renaming the struct so ChipIngressBatchService, make sure to adjust the name of the constructor

@thomaska thomaska requested a review from a team as a code owner March 2, 2026 11:18
@thomaska
Copy link
Author

Blocking behavior regression in DualSourceEmitter when batch mode is off

In pkg/beholder/dual_source_emitter.go, chip-ingress emit was changed from async to inline:

if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { ... }

This makes DualSourceEmitter.Emit block on chip-ingress in legacy mode (ChipIngressBatchEmitterEnabled=false, still default in pkg/beholder/client.go). Previously this path was fire-and-forget via goroutine, so caller latency/backpressure characteristics change materially.

Can we keep async behavior for non-batch emitters (or gate sync behavior strictly to ChipIngressBatchEmitter) to avoid regressing existing deployments?

that's a very good spot, thank you @pkcll 🙏


queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) {
if sendErr != nil {
e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the context passed in from the parameters

ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout)
// Use a standalone timeout context so the shutdown wait isn't cancelled
// by close(b.stopCh) below.
ctx, cancel := context.WithTimeout(context.Background(), b.shutdownTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I follow why we are doing this?

Copy link
Author

@thomaska thomaska Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a potential issue that opus pointed out and it made sense to me.
TL;DR; the timeout will never be respected and the drain didn't have time to run properly, as the same context was being closed right after it was created.
Longer version:
In L121: close(b.stopCh) -- closes the context
which is the same used in L113: ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout)
thus the "Done" part in the following select is executed instantaneouly along with the warning message:

		select {
		case <-done:
			// All successfully shutdown
--->		case <-ctx.Done(): // timeout or context cancelled
			b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout)
		}

Does this make any sense to you?

Comment on lines 27 to 48
@@ -42,6 +43,7 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt
chipIngressEmitter: chipIngressEmitter,
otelCollectorEmitter: otelCollectorEmitter,
log: logger,
nonBlockingEmitter: nonBlockingChipIngress,
stopCh: make(services.StopChan),
}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should always be non-blocking

Copy link
Author

@thomaska thomaska Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eventual implementation is always non-blocking. The name is not very descriptive?
would something likechipIngressBatchEmitterEnabled be better? This is essentially the feature flag being propagated.

Comment on lines +69 to +85
} else {
// Legacy ChipIngressEmitter.Emit is a synchronous gRPC call;
// fire-and-forget via goroutine to avoid blocking the caller.
if err := d.wg.TryAdd(1); err != nil {
return err
}
go func(ctx context.Context) {
defer d.wg.Done()
var cancel context.CancelFunc
ctx, cancel = d.stopCh.Ctx(ctx)
defer cancel()

if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil {
d.log.Infof("failed to emit to chip ingress: %v", err)
}
}(context.WithoutCancel(ctx))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we pass in the batch client, can simply just queue the message ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand :/ Should we remove completely the previous implementation and use the batch client everywhere? If yes should we remove the feature flag as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disregard my comment

envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should always be enabled

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we were discussing with @pkcll to merge this initially with the flag disabled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

if sendErr != nil {
e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs)
} else {
e.metrics.eventsSent.Add(context.Background(), 1, metricAttrs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

}
})
if queueErr != nil {
e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor

@pkcll pkcll Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add metrics to batch client to observe batching behavior (could be done in separate PR)

  • batch req size (in message) vs max match size
  • batch req size in bytes vs max grpc req size
  • req latency
  • [optional] report batch client configuration as a gauge metric like we do for beholder

pkcll
pkcll previously approved these changes Mar 16, 2026
if err != nil {
return nil, fmt.Errorf("failed to create chip ingress batch emitter: %w", err)
}
if err = batchEmitterService.Start(context.Background()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to pass the parent component context to batchEmitterService.Start

if err != nil {
return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err)
var chipIngressEmitter Emitter
if cfg.ChipIngressBatchEmitterEnabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code needs to be added to both grpc client and http beholder clients

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg/beholder/httpclient.go currently has no chip ingress client.
Should we add support for it in this PR?

}

// NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client.
func NewChipIngressBatchEmitter(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitter, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to be able to pass parent context to it to be able to gracefully start and stop.

// and logs will be sent via OTLP using the regular Logger instead of calling Emit
emitter := NewMessageEmitter(messageLogger)

var batchEmitterService *ChipIngressBatchEmitter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid confusion lets call ChipIngressBatchEmitter something with Service word in it
e.g ChipIngressBatchEmitterService
so that its clear its long running and implement Start/Stop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg/beholder/chip_ingress_batch_emitter.go -> pkg/beholder/chip_ingress_batch_emitter_service.go ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants