Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 152 additions & 1 deletion crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use ethlambda_crypto::aggregate_signatures;
use ethlambda_state_transition::{
is_proposer, process_block, process_slots, slot_is_justifiable_after,
};
use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload};
use ethlambda_storage::{
ForkCheckpoints, PRUNING_FALLBACK_INTERVAL_SLOTS, SignatureKey, Store, StoredAggregatedPayload,
};
use ethlambda_types::{
ShortRoot,
attestation::{
Expand Down Expand Up @@ -313,6 +315,20 @@ pub fn on_tick(
// NOTE: here we assume on_tick never skips intervals
match interval {
0 => {
// Periodic fallback pruning when finalization is stalled
let finalized = store.latest_finalized();
if slot > 0
&& slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS)
&& slot.saturating_sub(finalized.slot) > PRUNING_FALLBACK_INTERVAL_SLOTS
{
warn!(
%slot,
finalized_slot = finalized.slot,
"Finalization stalled, running periodic fallback pruning"
);
store.periodic_prune();
}

// Start of slot - process attestations if proposal exists
if should_signal_proposal {
accept_new_attestations(store, false);
Expand Down Expand Up @@ -1254,3 +1270,138 @@ fn is_reorg(old_head: H256, new_head: H256, store: &Store) -> bool {
// Assume the ancestor is behind the latest finalized block
false
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;

use ethlambda_storage::{StorageBackend, Table, backend::InMemoryBackend};
use ethlambda_types::{block::BlockHeader, primitives::ssz::Encode, state::State};

use crate::MILLISECONDS_PER_INTERVAL;

/// Generate a deterministic H256 root from an index.
fn root(index: u64) -> H256 {
let mut bytes = [0u8; 32];
bytes[..8].copy_from_slice(&index.to_be_bytes());
H256::from(bytes)
}

/// Insert a block header (and dummy body + signature) for a given root and slot.
fn insert_header(backend: &dyn StorageBackend, root: H256, slot: u64) {
let header = BlockHeader {
slot,
proposer_index: 0,
parent_root: H256::ZERO,
state_root: H256::ZERO,
body_root: H256::ZERO,
};
let mut batch = backend.begin_write().expect("write batch");
let key = root.as_ssz_bytes();
batch
.put_batch(
Table::BlockHeaders,
vec![(key.clone(), header.as_ssz_bytes())],
)
.expect("put header");
batch
.put_batch(Table::BlockBodies, vec![(key.clone(), vec![0u8; 4])])
.expect("put body");
batch
.put_batch(Table::BlockSignatures, vec![(key, vec![0u8; 4])])
.expect("put sigs");
batch.commit().expect("commit");
}

/// Insert a dummy state for a given root.
fn insert_state(backend: &dyn StorageBackend, root: H256) {
let mut batch = backend.begin_write().expect("write batch");
let key = root.as_ssz_bytes();
batch
.put_batch(Table::States, vec![(key, vec![0u8; 4])])
.expect("put state");
batch.commit().expect("commit");
}

/// Count entries in a table.
fn count_entries(backend: &dyn StorageBackend, table: Table) -> usize {
let view = backend.begin_read().expect("read view");
view.prefix_iterator(table, &[])
.expect("iterator")
.filter_map(|r| r.ok())
.count()
}

/// Tick the store to exactly interval 0 of the given slot.
///
/// Pre-sets store.time so on_tick processes a single interval, avoiding
/// side effects from other intervals (update_safe_target, etc.).
fn tick_to_interval_0(store: &mut Store, slot: u64) {
store.set_time(slot * INTERVALS_PER_SLOT - 1);
let timestamp_ms = slot * INTERVALS_PER_SLOT * MILLISECONDS_PER_INTERVAL;
on_tick(store, timestamp_ms, false, false);
}

#[test]
fn on_tick_triggers_periodic_pruning_when_finalization_stalled() {
let backend = Arc::new(InMemoryBackend::new());
let state = State::from_genesis(0, vec![]);
let mut store = Store::from_anchor_state(backend.clone(), state);

// Insert 905 additional headers + states (exceeds STATES_TO_KEEP=900).
// Genesis already inserted 1 header + state, so total = 906.
for i in 1..=905u64 {
insert_header(backend.as_ref(), root(i), i);
insert_state(backend.as_ref(), root(i));
}
let states_before = count_entries(backend.as_ref(), Table::States);
assert!(states_before > 900, "should exceed retention window");

// Tick to interval 0 of slot 2*PRUNING_FALLBACK_INTERVAL_SLOTS.
// Finalization is at slot 0, so lag = 14400 > 7200 → periodic pruning fires.
tick_to_interval_0(&mut store, PRUNING_FALLBACK_INTERVAL_SLOTS * 2);

let states_after = count_entries(backend.as_ref(), Table::States);
assert!(
states_after < states_before,
"states should have been pruned: before={states_before}, after={states_after}"
);
}

#[test]
fn on_tick_skips_periodic_pruning_when_finalization_healthy() {
let backend = Arc::new(InMemoryBackend::new());
let state = State::from_genesis(0, vec![]);
let mut store = Store::from_anchor_state(backend.clone(), state);

// Advance finalization first (this triggers update_checkpoints' own pruning).
let target_slot = PRUNING_FALLBACK_INTERVAL_SLOTS;
store.update_checkpoints(ForkCheckpoints::new(
store.head(),
None,
Some(Checkpoint {
slot: target_slot - 1,
root: store.head(),
}),
));

// Insert excess states AFTER finalization pruning has run.
for i in 1..=905u64 {
insert_header(backend.as_ref(), root(i), i);
insert_state(backend.as_ref(), root(i));
}
let states_before = count_entries(backend.as_ref(), Table::States);
assert!(states_before > 900, "should exceed retention window");

// Tick to interval 0 of the same target slot.
// Finalization is at target_slot - 1, so lag = 1 ≤ threshold → no periodic pruning.
tick_to_interval_0(&mut store, target_slot);

let states_after = count_entries(backend.as_ref(), Table::States);
assert_eq!(
states_after, states_before,
"no pruning should occur when finalization is healthy"
);
}
}
4 changes: 2 additions & 2 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ pub mod backend;
mod store;
mod types;

pub use api::StorageBackend;
pub use store::{ForkCheckpoints, SignatureKey, Store};
pub use api::{StorageBackend, StorageReadView, StorageWriteBatch, Table};
pub use store::{ForkCheckpoints, PRUNING_FALLBACK_INTERVAL_SLOTS, SignatureKey, Store};
pub use types::{StoredAggregatedPayload, StoredSignature};
129 changes: 129 additions & 0 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ const _: () = assert!(
"BLOCKS_TO_KEEP must be >= STATES_TO_KEEP"
);

/// Periodic fallback pruning interval: prune old blocks/states every N slots
/// even if finalization hasn't advanced. Set to 7200 slots (~8 hours at 4s/slot).
pub const PRUNING_FALLBACK_INTERVAL_SLOTS: u64 = 7200;

// ============ Key Encoding Helpers ============

/// Encode a SignatureKey (validator_id, root) to bytes.
Expand Down Expand Up @@ -417,6 +421,21 @@ impl Store {
}
}

/// Fallback pruning for when finalization is stalled.
///
/// Calls `prune_old_states` and `prune_old_blocks` with the current
/// finalized and justified roots as protected. This reuses the same
/// retention-window logic from `update_checkpoints`, but runs
/// independently of finalization advancement.
pub fn periodic_prune(&mut self) {
let protected_roots = [self.latest_finalized().root, self.latest_justified().root];
let pruned_states = self.prune_old_states(&protected_roots);
let pruned_blocks = self.prune_old_blocks(&protected_roots);
if pruned_states > 0 || pruned_blocks > 0 {
info!(pruned_states, pruned_blocks, "Periodic fallback pruning");
}
}

// ============ Blocks ============

/// Get block data for fork choice: root -> (slot, parent_root).
Expand Down Expand Up @@ -1430,4 +1449,114 @@ mod tests {
assert!(has_key(backend.as_ref(), Table::States, &finalized_root));
assert!(has_key(backend.as_ref(), Table::States, &justified_root));
}

// ============ Periodic Pruning Tests ============

/// Set up finalized and justified checkpoints in metadata.
fn set_checkpoints(backend: &dyn StorageBackend, finalized: Checkpoint, justified: Checkpoint) {
let mut batch = backend.begin_write().expect("write batch");
batch
.put_batch(
Table::Metadata,
vec![
(KEY_LATEST_FINALIZED.to_vec(), finalized.as_ssz_bytes()),
(KEY_LATEST_JUSTIFIED.to_vec(), justified.as_ssz_bytes()),
],
)
.expect("put checkpoints");
batch.commit().expect("commit");
}

#[test]
fn periodic_prune_removes_old_states_and_blocks() {
let backend = Arc::new(InMemoryBackend::new());
let mut store = Store {
backend: backend.clone(),
};

// Use roots that are within the retention window as finalized/justified
let finalized_root = root(0);
let justified_root = root(1);
set_checkpoints(
backend.as_ref(),
Checkpoint {
slot: 0,
root: finalized_root,
},
Checkpoint {
slot: 1,
root: justified_root,
},
);

// Insert more than STATES_TO_KEEP headers + states, but fewer than BLOCKS_TO_KEEP
let total_states = STATES_TO_KEEP + 5;
for i in 0..total_states as u64 {
insert_header(backend.as_ref(), root(i), i);
insert_state(backend.as_ref(), root(i));
}

assert_eq!(count_entries(backend.as_ref(), Table::States), total_states);
assert_eq!(
count_entries(backend.as_ref(), Table::BlockHeaders),
total_states
);

// periodic_prune should prune states beyond STATES_TO_KEEP
// (protecting finalized and justified roots)
store.periodic_prune();

// States beyond retention should be pruned (5 excess - 2 protected = 3 pruned)
assert_eq!(
count_entries(backend.as_ref(), Table::States),
STATES_TO_KEEP + 2
);
// Finalized and justified states must survive
assert!(has_key(backend.as_ref(), Table::States, &finalized_root));
assert!(has_key(backend.as_ref(), Table::States, &justified_root));

// Blocks: total_states < BLOCKS_TO_KEEP, so no blocks should be pruned
assert_eq!(
count_entries(backend.as_ref(), Table::BlockHeaders),
total_states
);
}

#[test]
fn periodic_prune_no_op_within_retention() {
let backend = Arc::new(InMemoryBackend::new());
let mut store = Store {
backend: backend.clone(),
};

set_checkpoints(
backend.as_ref(),
Checkpoint {
slot: 0,
root: root(0),
},
Checkpoint {
slot: 0,
root: root(0),
},
);

// Insert exactly STATES_TO_KEEP entries (no excess)
for i in 0..STATES_TO_KEEP as u64 {
insert_header(backend.as_ref(), root(i), i);
insert_state(backend.as_ref(), root(i));
}

store.periodic_prune();

// Nothing should be pruned
assert_eq!(
count_entries(backend.as_ref(), Table::States),
STATES_TO_KEEP
);
assert_eq!(
count_entries(backend.as_ref(), Table::BlockHeaders),
STATES_TO_KEEP
);
}
}