Skip to content

Conversation

@xgreenx
Copy link
Collaborator

@xgreenx xgreenx commented Dec 9, 2025

  • Splitted read storage from write storage
  • Instead of using channels, now we query data directly from the storage adapter
  • Removed cloning of blocks by suing Arc

Instead of using channels, now we query data directly from the storage adapter.
@xgreenx xgreenx self-assigned this Dec 9, 2025
@xgreenx xgreenx requested review from a team, Dentosal, MitchTurner and rymnc as code owners December 9, 2025 20:19
@cursor
Copy link

cursor bot commented Dec 9, 2025

PR Summary

Major refactor of the block-aggregator: introduces separate read/write storage layers and a direct storage-backed service (no channels), updates RPC wiring, and adapts tests and storage utilities.

  • Block Aggregator API (service/task):
    • Introduce service and task modules with SharedState, Task, and StorageOrRemote{DB,BlocksProvider}.
    • Define BlocksProvider (read) and BlocksStorage (write) traits; enforce contiguous height writes.
    • Replace channel-based API with direct adapter calls via BlocksAggregatorApi in protobuf_adapter.
    • Remove integration and importer_and_db_source modules; add old_block_source with iterator-based block fetching and ConvertorAdapter.
  • RPC wiring and adapters:
    • Switch to fuel_core_block_aggregator_api::service::{Config, StorageMethod} across CLI, config, and sub-services.
    • Initialize RPC with OldBlocksSource<ConvertorAdapter,...>; use broadcast-backed new-block subscriptions.
    • Make TxReceipts::get_receipts sync; update ReceiptSource and tests.
  • Storage changes:
    • Add AsStructuredStorage and refactor storage DBs (storage_db, remote_cache) to use it; provide StorageBlocksProvider and RemoteBlocksProvider for reads.
    • Update contiguous height checks and metadata updates; simplify rollback via latest_height_from_metadata and rollback_last_block.
  • Core tweaks:
    • Adjust Database<BlockAggregatorDatabase> commit behavior; remove custom rollback method.
    • Refine CombinedDatabase::rollback_to with <= checks and block-aggregator storage rollback path.
  • Tests and tooling:
    • Update RPC and S3 tests for new receipts shape (nested vectors) and API; remove flaky concurrency test.
    • Add mocks (mockall) and itertools; tweak tokio features.

Written by Cursor Bugbot for commit 6635578. This will update automatically on new commits. Configure here.

@xgreenx xgreenx added the no changelog Skip the CI check of the changelog modification label Dec 9, 2025
subscribe_to_transactions: Option<bool>,
#[cfg(feature = "rpc")]
rpc_config: Option<fuel_core_block_aggregator_api::integration::Config>,
rpc_config: Option<fuel_core_block_aggregator_api::service::Config>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't like integration over service :P ?

query_sender:
tokio::sync::mpsc::Sender<BlockAggregatorQuery<BlockRangeResponse, ProtoBlock>>,
#[cfg_attr(test, mockall::automock)]
pub trait BlocksAggregatorApi: Send + Sync + 'static {
Copy link
Member

@MitchTurner MitchTurner Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reality, we could just depend on the BlocksProvider trait here and directly on the broadcast::Sender that we have in the SharedStorage. This would parallel what we are doing in the aggregator service already, since it takes a dep on BlocksStorage and broadcast::Sender.

I think that's the right pattern.

tokio::sync::mpsc::Receiver<BlockAggregatorQuery<BlockRangeResponse, ProtoBlock>>,
pub struct UninitializedTask<B> {
addr: SocketAddr,
api: B,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment. Maybe rename something like block_aggregator

Comment on lines +203 to +210
let server = Server::new(self.api);
let router = tonic::transport::Server::builder()
.add_service(ProtoBlockAggregatorServer::new(server));
self.router = Some(router);
Ok(())
}

fn get_router(&mut self) -> anyhow::Result<Router> {
self.router
.take()
.ok_or_else(|| anyhow!("Router has not been initialized yet"))
let task = Task {
addr: self.addr,
router: Some(router),
};
Ok(task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need an UninitializedTask just for this? What's the point of into_task for anyway haha?

impl BlockAggregatorApi for ProtobufAPI {
type BlockRangeResponse = BlockRangeResponse;
type Block = ProtoBlock;
pub type APIService<B> = ServiceRunner<UninitializedTask<B>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'm not a fan of aliases like this, as it adds misdirection. If I publically saw an APIService, I wouldn't assume that it was in fact the ServiceRunner.

}
}

impl<S> BlocksAggregatorApi for SharedState<S>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be iimplementing a foreign port like this here. See above comment on moving renaming and moving SharedState.


/// The Block Aggregator service, which aggregates blocks from a source and stores them in a database
/// Queries can be made to the service to retrieve data from the `DB`
pub struct Task<S1, S2, Blocks>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why S1 and S2? Can we use S and P? or Storage and PRovider?

Comment on lines +13 to +29
pub trait BlocksProvider: Send + Sync + 'static {
type Block: Send + Sync + 'static;
/// The type used to report a range of blocks
type BlockRangeResponse;

/// Stores a block with the given ID
fn store_block(
&mut self,
block: BlockSourceEvent<Self::Block>,
) -> impl Future<Output = Result<()>> + Send;

/// Retrieves a range of blocks from the database
fn get_block_range(
&self,
first: BlockHeight,
last: BlockHeight,
) -> impl Future<Output = Result<Self::BlockRangeResponse>> + Send;
) -> Result<Self::BlockRangeResponse>;

/// Retrieves the current height of the aggregated blocks If there is a break in the blocks,
/// i.e. the blocks are being aggregated out of order, return the height of the last
/// contiguous block
fn get_current_height(
&self,
) -> impl Future<Output = Result<Option<BlockHeight>>> + Send;
fn get_current_height(&self) -> Result<Option<BlockHeight>>;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can move the the protobuf adapter code and replace BlocksAggregatorAPI port, like I mentioned above.

}
}

impl<S> BlocksProvider for StorageBlocksProvider<S>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move this port to the protobuf adapter code, this impl should go with it.

Comment on lines +118 to +121
S: 'static,
S: KeyValueInspect<Column = Column>,
S: AtomicView,
S::LatestView: Unpin + Send + Sync + KeyValueInspect<Column = Column> + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK how you remember which traits to use to keep it this clean :P

let take = match result {
Ok((block_height, _)) => {
if let Some(importer_height) = importer_height {
*block_height < importer_height
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Block at importer height lost during stream restart

The take_while condition uses *block_height < importer_height which excludes the block at exactly last_seen_importer_height from being fetched from the old blocks source. When restart_blocks_stream() is called, a new broadcast subscription is created after the block at that height was already broadcasted. Since the block is neither taken from old blocks (excluded by <) nor received by the new subscription (broadcast already happened), it gets permanently lost. The condition should use <= instead of < to ensure the boundary block is recovered from the old blocks source.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

no changelog Skip the CI check of the changelog modification

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants