-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Splitted read storage from write storage #3164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: chore/add-remote-block-cache
Are you sure you want to change the base?
Splitted read storage from write storage #3164
Conversation
Instead of using channels, now we query data directly from the storage adapter.
PR SummaryMajor refactor of the block-aggregator: introduces separate read/write storage layers and a direct storage-backed service (no channels), updates RPC wiring, and adapts tests and storage utilities.
Written by Cursor Bugbot for commit 6635578. This will update automatically on new commits. Configure here. |
| subscribe_to_transactions: Option<bool>, | ||
| #[cfg(feature = "rpc")] | ||
| rpc_config: Option<fuel_core_block_aggregator_api::integration::Config>, | ||
| rpc_config: Option<fuel_core_block_aggregator_api::service::Config>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't like integration over service :P ?
| query_sender: | ||
| tokio::sync::mpsc::Sender<BlockAggregatorQuery<BlockRangeResponse, ProtoBlock>>, | ||
| #[cfg_attr(test, mockall::automock)] | ||
| pub trait BlocksAggregatorApi: Send + Sync + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In reality, we could just depend on the BlocksProvider trait here and directly on the broadcast::Sender that we have in the SharedStorage. This would parallel what we are doing in the aggregator service already, since it takes a dep on BlocksStorage and broadcast::Sender.
I think that's the right pattern.
| tokio::sync::mpsc::Receiver<BlockAggregatorQuery<BlockRangeResponse, ProtoBlock>>, | ||
| pub struct UninitializedTask<B> { | ||
| addr: SocketAddr, | ||
| api: B, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above comment. Maybe rename something like block_aggregator
| let server = Server::new(self.api); | ||
| let router = tonic::transport::Server::builder() | ||
| .add_service(ProtoBlockAggregatorServer::new(server)); | ||
| self.router = Some(router); | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn get_router(&mut self) -> anyhow::Result<Router> { | ||
| self.router | ||
| .take() | ||
| .ok_or_else(|| anyhow!("Router has not been initialized yet")) | ||
| let task = Task { | ||
| addr: self.addr, | ||
| router: Some(router), | ||
| }; | ||
| Ok(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need an UninitializedTask just for this? What's the point of into_task for anyway haha?
| impl BlockAggregatorApi for ProtobufAPI { | ||
| type BlockRangeResponse = BlockRangeResponse; | ||
| type Block = ProtoBlock; | ||
| pub type APIService<B> = ServiceRunner<UninitializedTask<B>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'm not a fan of aliases like this, as it adds misdirection. If I publically saw an APIService, I wouldn't assume that it was in fact the ServiceRunner.
| } | ||
| } | ||
|
|
||
| impl<S> BlocksAggregatorApi for SharedState<S> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be iimplementing a foreign port like this here. See above comment on moving renaming and moving SharedState.
|
|
||
| /// The Block Aggregator service, which aggregates blocks from a source and stores them in a database | ||
| /// Queries can be made to the service to retrieve data from the `DB` | ||
| pub struct Task<S1, S2, Blocks> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why S1 and S2? Can we use S and P? or Storage and PRovider?
| pub trait BlocksProvider: Send + Sync + 'static { | ||
| type Block: Send + Sync + 'static; | ||
| /// The type used to report a range of blocks | ||
| type BlockRangeResponse; | ||
|
|
||
| /// Stores a block with the given ID | ||
| fn store_block( | ||
| &mut self, | ||
| block: BlockSourceEvent<Self::Block>, | ||
| ) -> impl Future<Output = Result<()>> + Send; | ||
|
|
||
| /// Retrieves a range of blocks from the database | ||
| fn get_block_range( | ||
| &self, | ||
| first: BlockHeight, | ||
| last: BlockHeight, | ||
| ) -> impl Future<Output = Result<Self::BlockRangeResponse>> + Send; | ||
| ) -> Result<Self::BlockRangeResponse>; | ||
|
|
||
| /// Retrieves the current height of the aggregated blocks If there is a break in the blocks, | ||
| /// i.e. the blocks are being aggregated out of order, return the height of the last | ||
| /// contiguous block | ||
| fn get_current_height( | ||
| &self, | ||
| ) -> impl Future<Output = Result<Option<BlockHeight>>> + Send; | ||
| fn get_current_height(&self) -> Result<Option<BlockHeight>>; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can move the the protobuf adapter code and replace BlocksAggregatorAPI port, like I mentioned above.
| } | ||
| } | ||
|
|
||
| impl<S> BlocksProvider for StorageBlocksProvider<S> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we move this port to the protobuf adapter code, this impl should go with it.
| S: 'static, | ||
| S: KeyValueInspect<Column = Column>, | ||
| S: AtomicView, | ||
| S::LatestView: Unpin + Send + Sync + KeyValueInspect<Column = Column> + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDK how you remember which traits to use to keep it this clean :P
| let take = match result { | ||
| Ok((block_height, _)) => { | ||
| if let Some(importer_height) = importer_height { | ||
| *block_height < importer_height |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Block at importer height lost during stream restart
The take_while condition uses *block_height < importer_height which excludes the block at exactly last_seen_importer_height from being fetched from the old blocks source. When restart_blocks_stream() is called, a new broadcast subscription is created after the block at that height was already broadcasted. Since the block is neither taken from old blocks (excluded by <) nor received by the new subscription (broadcast already happened), it gets permanently lost. The condition should use <= instead of < to ensure the boundary block is recovered from the old blocks source.
Arc