Conversation
|
👋 thomaska, thanks for creating this pull request! To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team. Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks! |
There was a problem hiding this comment.
Pull request overview
This pull request replaces the per-event ChIP Ingress emission with a batched approach to reduce overhead from N gRPC calls + N Kafka transactions to 1 call + 1 transaction per flush interval. The implementation introduces a new ChipIngressBatchEmitter that buffers events per (domain, entity) pair and flushes them periodically using PublishBatch.
Changes:
- Introduced
ChipIngressBatchEmitterwith per-(domain, entity) worker goroutines for batching events - Added
chipIngressEmitterWorkerto handle batch assembly and sending with configurable timeouts - Removed goroutine wrapper from
DualSourceEmitter.Emit()since batching is now non-blocking (channel send) - Added 4 new configuration parameters with sensible defaults (BufferSize: 100, MaxBatchSize: 50, SendInterval: 500ms, SendTimeout: 10s)
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/beholder/chip_ingress_batch_emitter.go | New batch emitter with per-worker buffering and periodic flushing via PublishBatch |
| pkg/beholder/chip_ingress_emitter_worker.go | Worker implementation handling batch assembly, channel draining, and exponential backoff logging for drops |
| pkg/beholder/chip_ingress_batch_emitter_test.go | Comprehensive test coverage (10 tests) for batching, max batch size, isolation, buffer overflow, lifecycle, errors, and defaults |
| pkg/beholder/dual_source_emitter.go | Simplified Emit() by removing goroutine wrapper since ChipIngressBatchEmitter.Emit() is non-blocking |
| pkg/beholder/client.go | Updated to create and start ChipIngressBatchEmitter instead of ChipIngressEmitter; added comment about closure ordering |
| pkg/beholder/config.go | Added 4 new config fields with inline documentation and default values |
| pkg/beholder/config_test.go | Updated expected output to include new config fields |
Comments suppressed due to low confidence (2)
pkg/beholder/config.go:50
- The comment states "Zero disables batching" but the implementation in NewChipIngressBatchEmitter treats zero as "use default" and sets it to 500ms. The comment should be corrected to match the actual behavior, e.g., "Flush interval per worker (default 500ms when zero or unset)".
ChipIngressSendInterval time.Duration // Flush interval per worker (default 500ms). Zero disables batching.
pkg/beholder/client.go:248
- The messageLoggerProvider appears twice in the shutdowner slice. This will cause it to be shut down twice, which could lead to errors or undefined behavior. Remove one of the duplicate entries.
for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} {
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| return nil, err | ||
| } | ||
|
|
||
| chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient) |
| // via chipingress.Client.PublishBatch on a periodic interval. | ||
| // It satisfies the Emitter interface so it can be used as a drop-in replacement | ||
| // for ChipIngressEmitter. | ||
| type ChipIngressBatchEmitter struct { |
| return e, nil | ||
| } | ||
|
|
||
| func (e *ChipIngressBatchEmitter) start(_ context.Context) error { |
There was a problem hiding this comment.
what's the role of this function if it always returns null?
There was a problem hiding this comment.
it was mostly added as a placeholder, but can be omitted as well.
And after checking, in the core/services/workflows/syncer/v2/handler.go in EventHandler it's also omitted, so. probably it's more consistent.
|
|
||
| // NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client. | ||
| // Call Start() to begin health monitoring, and Close() to stop all workers. | ||
| func NewChipIngressBatchEmitter(client chipingress.Client, lggr logger.Logger, cfg Config) (*ChipIngressBatchEmitter, error) { |
There was a problem hiding this comment.
this is pure stylistic, and feel free to ignore it, make the logger the last param and after renaming the struct so ChipIngressBatchService, make sure to adjust the name of the constructor
that's a very good spot, thank you @pkcll 🙏 |
|
|
||
| queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) { | ||
| if sendErr != nil { | ||
| e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs) |
There was a problem hiding this comment.
use the context passed in from the parameters
| ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout) | ||
| // Use a standalone timeout context so the shutdown wait isn't cancelled | ||
| // by close(b.stopCh) below. | ||
| ctx, cancel := context.WithTimeout(context.Background(), b.shutdownTimeout) |
There was a problem hiding this comment.
not sure I follow why we are doing this?
There was a problem hiding this comment.
This is a potential issue that opus pointed out and it made sense to me.
TL;DR; the timeout will never be respected and the drain didn't have time to run properly, as the same context was being closed right after it was created.
Longer version:
In L121: close(b.stopCh) -- closes the context
which is the same used in L113: ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout)
thus the "Done" part in the following select is executed instantaneouly along with the warning message:
select {
case <-done:
// All successfully shutdown
---> case <-ctx.Done(): // timeout or context cancelled
b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout)
}Does this make any sense to you?
| @@ -42,6 +43,7 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt | |||
| chipIngressEmitter: chipIngressEmitter, | |||
| otelCollectorEmitter: otelCollectorEmitter, | |||
| log: logger, | |||
| nonBlockingEmitter: nonBlockingChipIngress, | |||
| stopCh: make(services.StopChan), | |||
| }, nil | |||
There was a problem hiding this comment.
it should always be non-blocking
There was a problem hiding this comment.
The eventual implementation is always non-blocking. The name is not very descriptive?
would something likechipIngressBatchEmitterEnabled be better? This is essentially the feature flag being propagated.
| } else { | ||
| // Legacy ChipIngressEmitter.Emit is a synchronous gRPC call; | ||
| // fire-and-forget via goroutine to avoid blocking the caller. | ||
| if err := d.wg.TryAdd(1); err != nil { | ||
| return err | ||
| } | ||
| go func(ctx context.Context) { | ||
| defer d.wg.Done() | ||
| var cancel context.CancelFunc | ||
| ctx, cancel = d.stopCh.Ctx(ctx) | ||
| defer cancel() | ||
|
|
||
| if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { | ||
| d.log.Infof("failed to emit to chip ingress: %v", err) | ||
| } | ||
| }(context.WithoutCancel(ctx)) | ||
| } |
There was a problem hiding this comment.
if we pass in the batch client, can simply just queue the message ?
There was a problem hiding this comment.
I'm not sure I understand :/ Should we remove completely the previous implementation and use the batch client everywhere? If yes should we remove the feature flag as well?
| envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" | ||
| envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" | ||
| envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" | ||
| envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" |
There was a problem hiding this comment.
maybe this should always be enabled
There was a problem hiding this comment.
I think we were discussing with @pkcll to merge this initially with the flag disabled
| if sendErr != nil { | ||
| e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs) | ||
| } else { | ||
| e.metrics.eventsSent.Add(context.Background(), 1, metricAttrs) |
| } | ||
| }) | ||
| if queueErr != nil { | ||
| e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs) |
There was a problem hiding this comment.
Please add metrics to batch client to observe batching behavior (could be done in separate PR)
- batch req size (in message) vs max match size
- batch req size in bytes vs max grpc req size
- req latency
- [optional] report batch client configuration as a gauge metric like we do for beholder
pkg/beholder/client.go
Outdated
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create chip ingress batch emitter: %w", err) | ||
| } | ||
| if err = batchEmitterService.Start(context.Background()); err != nil { |
There was a problem hiding this comment.
You need to pass the parent component context to batchEmitterService.Start
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) | ||
| var chipIngressEmitter Emitter | ||
| if cfg.ChipIngressBatchEmitterEnabled { |
There was a problem hiding this comment.
this code needs to be added to both grpc client and http beholder clients
There was a problem hiding this comment.
pkg/beholder/httpclient.go currently has no chip ingress client.
Should we add support for it in this PR?
| } | ||
|
|
||
| // NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client. | ||
| func NewChipIngressBatchEmitter(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitter, error) { |
There was a problem hiding this comment.
You need to be able to pass parent context to it to be able to gracefully start and stop.
pkg/beholder/client.go
Outdated
| // and logs will be sent via OTLP using the regular Logger instead of calling Emit | ||
| emitter := NewMessageEmitter(messageLogger) | ||
|
|
||
| var batchEmitterService *ChipIngressBatchEmitter |
There was a problem hiding this comment.
To avoid confusion lets call ChipIngressBatchEmitter something with Service word in it
e.g ChipIngressBatchEmitterService
so that its clear its long running and implement Start/Stop
There was a problem hiding this comment.
pkg/beholder/chip_ingress_batch_emitter.go -> pkg/beholder/chip_ingress_batch_emitter_service.go ?
Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436
Summary
ChipIngressBatchEmitter, a new batched event delivery path thatreplaces per-event
PublishgRPC calls with bufferedPublishBatchcalls.Each unique (domain, entity) pair gets its own
batch.Clientworker withindependent buffer, flush interval, and concurrency scaling.
ChipIngressBatchEmitterEnabledfeature flag (default: false).When disabled, the existing per-event
ChipIngressEmitteris used unchanged.the batch emitter can drain buffered events while the transport is still alive.
batch.Client.Stop()where the shutdown timeout context wasimmediately cancelled by
close(stopCh)viaCtxWithTimeout, preventinggraceful drain.
EmitWithCallbackfor callers that need delivery confirmation.beholder.Configandloop.EnvConfig(buffer size, batch size, max workers, send interval/timeout, drain timeout,
max concurrent sends).
ChipIngressLogger) to be providedwhen the batch emitter is enabled.
Test plan
chip-e2e(separate PR, depends on this merge)Supports
smartcontractkit/chainlink#21327