Replace aggregated payload RocksDB tables with in-memory circular buffers#182
Replace aggregated payload RocksDB tables with in-memory circular buffers#182pablodeymo wants to merge 1 commit intomainfrom
Conversation
…fers and add slot-age pruning for attestation data Aggregated payloads (new and known) are now stored in VecDeque-based circular buffers with a 4096-entry hard cap and FIFO eviction, matching Lantern's approach. This prevents unbounded memory growth when finalization stalls (~11.5 GB OOM after ~27 min). The two RocksDB tables (LatestNewAggregatedPayloads and LatestKnownAggregatedPayloads) are removed entirely since aggregated payloads are ephemeral and rebuilt from gossip on restart. Additionally, GossipSignatures and AttestationDataByRoot are now pruned by slot age (64-slot retention window) every slot at interval 4, independent of finalization progress.
🤖 Kimi Code ReviewReview SummaryThis PR replaces RocksDB-backed aggregated payload storage with in-memory circular buffers ( Key Strengths
Minor Issues1. Potential race condition in pub fn promote_new_aggregated_payloads(&mut self) {
// Use drain + extend to make this atomic
let drained = self.new_payloads.entries.drain(..).collect::<Vec<_>>();
self.known_payloads.entries.extend(drained);
}2. Missing capacity check in fn push_batch(&mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>) {
for (key, payload) in entries {
// This ensures we never exceed capacity even temporarily
self.push(key, payload);
}
}3. Documentation inconsistency (blockchain/src/store.rs:118) Consensus SafetyThe changes maintain all critical consensus properties:
Performance ImpactPositive: Eliminates RocksDB I/O for aggregated payloads (hot path). The 4096-entry cap (~30 min at 1 attestation/slot) is reasonable for the 9-validator testnet. Recommendation: ✅ Approve - The PR is ready to merge after addressing the minor issues above. Automated review by Kimi (Moonshot AI) · custom prompt |
🤖 Codex Code Review
Could not run tests locally in this environment because Automated review by OpenAI Codex · custom prompt |
🤖 Claude Code ReviewPR Review: Replace Aggregated Payload Tables with In-Memory Circular BuffersThe motivation and overall approach are solid — replacing unbounded RocksDB append-only tables with fixed-size FIFO buffers is the right call for ephemeral data that doesn't need durability. The slot-age pruning is also a good complementary fix for the finalization-stall case. Findings below in descending severity. Bug:
|
Greptile SummaryThis PR eliminates the primary source of unbounded memory growth during finalization stalls by replacing two RocksDB aggregated-payload tables with in-memory Key changes:
The overall approach is sound and directly targets the root cause. Two findings:
Confidence Score: 4/5
Last reviewed commit: 626b966 |
| pub fn prune_attestation_data_by_age(&mut self, current_slot: u64) -> (usize, usize) { | ||
| let cutoff_slot = current_slot.saturating_sub(ATTESTATION_RETENTION_SLOTS); | ||
| if cutoff_slot == 0 { | ||
| return (0, 0); | ||
| } | ||
| let pruned_sigs = self.prune_gossip_signatures(cutoff_slot); | ||
| let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot); | ||
| (pruned_sigs, pruned_att_data) | ||
| } |
There was a problem hiding this comment.
During normal finalization, both prune_attestation_data_by_age (line 574-582) and update_checkpoints (line 456-457) call the same prune functions (prune_gossip_signatures and prune_attestation_data_by_root), causing redundant full-table scans when finalization advances.
While idempotent and functionally correct, you could optimize by skipping age-based pruning when finalization has already advanced past the age cutoff:
| pub fn prune_attestation_data_by_age(&mut self, current_slot: u64) -> (usize, usize) { | |
| let cutoff_slot = current_slot.saturating_sub(ATTESTATION_RETENTION_SLOTS); | |
| if cutoff_slot == 0 { | |
| return (0, 0); | |
| } | |
| let pruned_sigs = self.prune_gossip_signatures(cutoff_slot); | |
| let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot); | |
| (pruned_sigs, pruned_att_data) | |
| } | |
| pub fn prune_attestation_data_by_age(&mut self, current_slot: u64) -> (usize, usize) { | |
| let cutoff_slot = current_slot.saturating_sub(ATTESTATION_RETENTION_SLOTS); | |
| if cutoff_slot == 0 { | |
| return (0, 0); | |
| } | |
| // Skip if finalization-based pruning already covers this window | |
| if self.latest_finalized().slot >= cutoff_slot { | |
| return (0, 0); | |
| } | |
| let pruned_sigs = self.prune_gossip_signatures(cutoff_slot); | |
| let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot); | |
| (pruned_sigs, pruned_att_data) | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/storage/src/store.rs
Line: 574-582
Comment:
During normal finalization, both `prune_attestation_data_by_age` (line 574-582) and `update_checkpoints` (line 456-457) call the same prune functions (`prune_gossip_signatures` and `prune_attestation_data_by_root`), causing redundant full-table scans when finalization advances.
While idempotent and functionally correct, you could optimize by skipping age-based pruning when finalization has already advanced past the age cutoff:
```suggestion
pub fn prune_attestation_data_by_age(&mut self, current_slot: u64) -> (usize, usize) {
let cutoff_slot = current_slot.saturating_sub(ATTESTATION_RETENTION_SLOTS);
if cutoff_slot == 0 {
return (0, 0);
}
// Skip if finalization-based pruning already covers this window
if self.latest_finalized().slot >= cutoff_slot {
return (0, 0);
}
let pruned_sigs = self.prune_gossip_signatures(cutoff_slot);
let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot);
(pruned_sigs, pruned_att_data)
}
```
How can I resolve this? If you propose a fix, please make it concise.| #[derive(Clone, Default)] | ||
| struct PayloadBuffer { | ||
| entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>, | ||
| capacity: usize, | ||
| } | ||
|
|
||
| impl PayloadBuffer { | ||
| fn new(capacity: usize) -> Self { | ||
| Self { | ||
| entries: VecDeque::with_capacity(capacity), | ||
| capacity, | ||
| } | ||
| } | ||
|
|
||
| /// Insert one entry, FIFO-evicting the oldest if at capacity. | ||
| fn push(&mut self, key: SignatureKey, payload: StoredAggregatedPayload) { | ||
| if self.entries.len() >= self.capacity { | ||
| self.entries.pop_front(); | ||
| } | ||
| self.entries.push_back((key, payload)); | ||
| } |
There was a problem hiding this comment.
#[derive(Default)] creates a broken buffer with capacity: 0. The push() method doesn't guard against this: when capacity == 0, the condition self.entries.len() >= self.capacity evaluates to 0 >= 0 == true, so pop_front() is called (no-op on empty deque), then push_back() inserts the entry, leaving len = 1 despite capacity = 0—violating the invariant entries.len() <= capacity.
While PayloadBuffer::default() is not called in current code (all instances use explicit PayloadBuffer::new(...)), the derived impl is a latent footgun. Remove the Default derive since Store initializes both buffers explicitly:
| #[derive(Clone, Default)] | |
| struct PayloadBuffer { | |
| entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>, | |
| capacity: usize, | |
| } | |
| impl PayloadBuffer { | |
| fn new(capacity: usize) -> Self { | |
| Self { | |
| entries: VecDeque::with_capacity(capacity), | |
| capacity, | |
| } | |
| } | |
| /// Insert one entry, FIFO-evicting the oldest if at capacity. | |
| fn push(&mut self, key: SignatureKey, payload: StoredAggregatedPayload) { | |
| if self.entries.len() >= self.capacity { | |
| self.entries.pop_front(); | |
| } | |
| self.entries.push_back((key, payload)); | |
| } | |
| #[derive(Clone)] | |
| struct PayloadBuffer { | |
| entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>, | |
| capacity: usize, | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/storage/src/store.rs
Line: 105-125
Comment:
`#[derive(Default)]` creates a broken buffer with `capacity: 0`. The `push()` method doesn't guard against this: when `capacity == 0`, the condition `self.entries.len() >= self.capacity` evaluates to `0 >= 0 == true`, so `pop_front()` is called (no-op on empty deque), then `push_back()` inserts the entry, leaving `len = 1` despite `capacity = 0`—violating the invariant `entries.len() <= capacity`.
While `PayloadBuffer::default()` is not called in current code (all instances use explicit `PayloadBuffer::new(...)`), the derived impl is a latent footgun. Remove the `Default` derive since `Store` initializes both buffers explicitly:
```suggestion
#[derive(Clone)]
struct PayloadBuffer {
entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>,
capacity: usize,
}
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
This is a valid concern
| /// | ||
| /// Independent of finalization — prevents unbounded growth when finalization stalls. | ||
| /// Returns (pruned_sigs, pruned_att_data). | ||
| pub fn prune_attestation_data_by_age(&mut self, current_slot: u64) -> (usize, usize) { |
There was a problem hiding this comment.
I think we don't need this. The circular buffer should be enough
| new_payloads: PayloadBuffer, | ||
| known_payloads: PayloadBuffer, |
There was a problem hiding this comment.
These should go behind an Arc<Mutex<...>>, right?
Also, we should add a test for that:
1. create store
2. clone store
3. modify first store
4. check modification holds in cloned store
| Self { backend } | ||
| Self { | ||
| backend, | ||
| new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), |
There was a problem hiding this comment.
I think we should use a lower cap for new payloads
| new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), | ||
| known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), |
There was a problem hiding this comment.
Why do we need these changes? I think we should refactor this in another PR
Code reviewFound 1 issue:
The two constants that define the mismatch: ethlambda/crates/storage/src/store.rs Lines 94 to 100 in 626b966 The silent skip in ethlambda/crates/storage/src/store.rs Lines 841 to 848 in 626b966 The fix should either align the two retention windows (prune 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
Motivation
When finalization stalls, ethlambda's attestation-related tables grow without bound, causing OOM crashes. In our devnet observations, memory climbs to ~11.5 GB and crashes in ~27-minute cycles. The root causes are:
LatestKnownAggregatedPayloadsandLatestNewAggregatedPayloadsgrow without bound — they are append-only with no eviction. Each gossip message with 9 validators creates 9 copies of the same ~256 KB proof, and the read-merge-write pattern in RocksDB keeps accumulating entries.GossipSignaturesandAttestationDataByRootalso grow unbounded when finalization stalls, since their pruning was only triggered by finalization events.Both pruning mechanisms were gated on finalization advancing. When finalization stalls (which is exactly when memory pressure is highest), nothing gets cleaned up.
Solution
Two complementary mechanisms, inspired by Lantern (C client):
1. In-memory circular buffer for aggregated payloads (Lantern-style)
Replace the two RocksDB aggregated payload tables with
VecDeque-based buffers that have a hard cap of 4096 entries and FIFO eviction.Why in-memory buffers are safe:
BlockChainactor reads/writes them (P2P and RPC clones never access these tables).StorederivesClone(cloned 3× at startup for P2P, RPC, and BlockChain actors), but cloningVecDequeis fine — only the BlockChain clone populates its buffers.Why 4096? With 9 validators, each producing 1 attestation per slot, that's ~455 unique attestation messages — roughly 30 minutes of history at 4-second slots. This matches Lantern's approach.
What changes for each method:
insert_new_aggregated_payloadself.new_payloads.push(key, payload)insert_known_aggregated_payloadself.known_payloads.push(key, payload)iter_known_aggregated_payloads.collect()self.known_payloads.grouped().into_iter()iter_new_aggregated_payloads.collect()self.new_payloads.grouped().into_iter()promote_new_aggregated_payloadsdrain new → push_batch to known(2 lines)2. Slot-age pruning for remaining DB tables
GossipSignaturesandAttestationDataByRootremain in RocksDB (they serve other purposes), but are now pruned by slot age every slot at interval 4, independent of finalization.current_slot.saturating_sub(64)— entries at or below this slot are prunedcutoff_slot == 0(early slots < 64), pruning is skipped entirely, so spec tests (which operate within short slot ranges) are unaffected.This ensures these tables stay bounded even during prolonged finalization stalls.
Changes
crates/storage/src/store.rsPayloadBufferstruct —VecDeque<(SignatureKey, StoredAggregatedPayload)>with configurable capacity. Methods:push(FIFO eviction),push_batch,drain,grouped(HashMap by key),unique_keys(HashSet).new_payloads/known_payloadsfields toStorestruct, initialized ininit_store.iter_aggregated_payloads,iter_aggregated_payload_keys,insert_aggregated_payload,insert_aggregated_payloads_batch,prune_aggregated_payload_table.prune_attestation_data_by_age— slot-age-based pruning forGossipSignaturesandAttestationDataByRoot.update_checkpoints(FIFO handles it now).crates/storage/src/api/tables.rsLatestNewAggregatedPayloadsandLatestKnownAggregatedPayloadsfromTableenum (10 → 8 tables).crates/storage/src/backend/rocksdb.rscf_namematch arms for the deleted table variants.crates/blockchain/src/store.rsprune_attestation_data_by_agecall at interval 4 (end of slot) with info logging when entries are pruned.How to Test
All 26 fork choice spec tests pass unchanged — they operate within short slot ranges (< 64 slots), so
cutoff_slotis 0 and age-based pruning short-circuits.For production validation, deploy to a devnet and monitor memory usage during a finalization stall. Memory should plateau instead of climbing linearly.