Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0f80f28
Skeleton for validation
AurelienFT May 27, 2025
256ba83
lock
AurelienFT May 27, 2025
2da2db1
chore: clean up validation
rymnc May 27, 2025
b0d171a
feat(parallel-executor): dependency graph for validation (#3032)
rymnc May 30, 2025
3e7acc7
chore: integrate dependency graph into recreate_block
rymnc Jun 2, 2025
b674d25
Add more logic for taking transaction to execute along with their dat…
AurelienFT Jun 11, 2025
e1ea432
Merge branch 'create_exec_sequencer_skeleton' into create_validation
AurelienFT Jun 12, 2025
4817870
Add more field on validation result
AurelienFT Jun 12, 2025
6616a3c
Add execution of transactions in validation
AurelienFT Jun 13, 2025
f86763b
Generate validation result
AurelienFT Jun 13, 2025
03f029a
Add creation of the block in validator
AurelienFT Jun 13, 2025
1ee06cb
add todo
AurelienFT Jun 13, 2025
d06b26c
Add coin verification on validation
AurelienFT Jun 16, 2025
a7e9500
Merge branch 'create_exec_sequencer_skeleton' into create_validation
AurelienFT Jun 16, 2025
2273de6
Add coin to root module
AurelienFT Jun 16, 2025
273d966
Update coin usage
AurelienFT Jun 16, 2025
e94256f
Merge branch 'create_exec_sequencer_skeleton' into create_validation
AurelienFT Jun 16, 2025
c27abc8
add creation of executor in validator
AurelienFT Jun 16, 2025
532f6fd
Merge branch 'create_exec_sequencer_skeleton' into create_validation
AurelienFT Jun 19, 2025
584fbb8
Fix validator compilation error
AurelienFT Jun 24, 2025
f759871
Fix all compilation error
AurelienFT Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/services/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ where
}

#[allow(clippy::too_many_arguments)]
fn execute_transaction_and_commit<'a, W>(
pub fn execute_transaction_and_commit<'a, W>(
&'a self,
block: &'a mut PartialFuelBlock,
storage_tx: &mut BlockStorageTransaction<W>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use fuel_core_types::{
};
use fxhash::FxHashMap;

use super::SchedulerError;

use crate::scheduler::SchedulerError;
#[derive(Debug, Eq)]
pub(crate) struct CoinInBatch {
/// The utxo id
Expand Down
124 changes: 105 additions & 19 deletions crates/services/parallel-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use crate::{
SchedulerExecutionResult,
},
tx_waiter::NoWaitTxs,
txs_ext::TxCoinbaseExt,
validator::{
self,
Validator,
},
};
use fuel_core_executor::{
executor::{
Expand Down Expand Up @@ -37,9 +42,12 @@ use fuel_core_storage::{
},
};
use fuel_core_types::{
blockchain::block::{
Block,
PartialFuelBlock,
blockchain::{
block::{
Block,
PartialFuelBlock,
},
header::PartialBlockHeader,
},
fuel_tx::{
Bytes32,
Expand Down Expand Up @@ -75,7 +83,10 @@ pub struct Executor<S, R, P> {
memory_pool: Option<Vec<MemoryInstance>>,
}

impl<S, R, P> Executor<S, R, P> {
impl<S, R, P> Executor<S, R, P>
where
R: Clone,
{
pub fn new(
storage_view_provider: S,
relayer: R,
Expand Down Expand Up @@ -197,20 +208,28 @@ where
return Ok(Default::default());
};

let prev_block = structured_storage
let prev_da = structured_storage
.storage::<FuelBlocks>()
.get(&prev_height)?
.ok_or_else(|| {
SchedulerError::InternalError("Previous block not found".to_string())
})?;
})?
.header()
.da_height();

if prev_block.header().da_height() != components.header_to_produce.da_height {
let (storage_tx, event_inbox_root) = self.process_l1_txs(
let mut storage_tx = StorageTransaction::transaction(
structured_storage.into_storage(),
ConflictPolicy::Fail,
Default::default(),
);

if prev_da != components.header_to_produce.da_height {
let event_inbox_root = self.process_l1_txs(
partial_block,
components.coinbase_recipient,
execution_data,
memory,
structured_storage.into_storage(),
&mut storage_tx,
executor,
)?;
Ok((storage_tx.into_changes(), event_inbox_root))
Expand All @@ -226,24 +245,18 @@ where
coinbase_contract_id: ContractId,
execution_data: &mut ExecutionData,
memory: &mut MemoryInstance,
view: View,
storage_tx: &mut StorageTransaction<View>,
executor: &mut BlockExecutor<R, NoWaitTxs, P>,
) -> Result<(StorageTransaction<View>, Bytes32), SchedulerError> {
let mut storage_tx = StorageTransaction::transaction(
view,
ConflictPolicy::Fail,
Default::default(),
);

) -> Result<Bytes32, SchedulerError> {
executor.process_l1_txs(
partial_block,
coinbase_contract_id,
&mut storage_tx,
storage_tx,
execution_data,
memory,
)?;

Ok((storage_tx, execution_data.event_inbox_root))
Ok(execution_data.event_inbox_root)
}

/// Run the parallel executor for L2 transactions
Expand Down Expand Up @@ -418,3 +431,76 @@ where
unimplemented!("Dry run not implemented yet");
}
}

impl<S, R, View> Executor<S, R, validator::NoPreconfirmationSender>
where
R: RelayerPort + Clone + Send + 'static,
S: AtomicView<LatestView = View> + Clone + Send + 'static,
View: KeyValueInspect<Column = Column> + Send + Sync + 'static,
{
async fn validate_block(
mut self,
block: &Block,
mut block_storage_tx: StorageTransaction<View>,
) -> Result<validator::ValidationResult, SchedulerError> {
let mut data = ExecutionData::new();

let partial_header = PartialBlockHeader::from(block.header());
let mut partial_block = PartialFuelBlock::new(partial_header, vec![]);
let transactions = block.transactions();
let mut memory = MemoryInstance::new();
let consensus_parameters = {
block_storage_tx
.storage::<ConsensusParametersVersions>()
.get(&block.header().consensus_parameters_version())?
.ok_or_else(|| {
SchedulerError::InternalError(
"Consensus parameters not found".to_string(),
)
})?
.into_owned()
};
// Initialize block executor
let mut executor = BlockExecutor::new(
self.relayer.clone(),
ExecutionOptions {
forbid_unauthorized_inputs: true,
forbid_fake_utxo: false,
backtrace: false,
},
consensus_parameters.clone(),
NoWaitTxs,
validator::NoPreconfirmationSender,
true, // dry run
)
.map_err(|e| {
SchedulerError::InternalError(format!("Failed to create executor: {e}"))
})?;
let validator = Validator::new(self.config.clone(), executor.clone());

let (gas_price, coinbase_contract_id) = transactions.coinbase()?;

let _event_root = self.process_l1_txs(
&mut partial_block,
coinbase_contract_id,
&mut data,
&mut memory,
&mut block_storage_tx,
&mut executor,
)?;
let processed_l1_tx_count = partial_block.transactions.len();

let components = Components {
header_to_produce: partial_block.header,
transactions_source: transactions.iter().cloned().skip(processed_l1_tx_count),
coinbase_recipient: coinbase_contract_id,
gas_price,
};

let executed_block_result = validator
.validate_block(components, block_storage_tx, block)
.await?;

Ok(executed_block_result)
}
}
3 changes: 3 additions & 0 deletions crates/services/parallel-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ pub mod config;
pub mod executor;
pub(crate) mod l1_execution_data;
pub mod ports;
pub(crate) mod txs_ext;

mod coin;
mod once_transaction_source;
mod tx_waiter;
mod validator;

#[cfg(test)]
mod tests;
Expand Down
17 changes: 12 additions & 5 deletions crates/services/parallel-executor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! This can be done because we assume that the transaction pool is sending us transactions that are alTransactionsReadyForPickup correctly verified.
//! If we have a transaction that end up being skipped (only possible cause if consensus parameters changes) then we will have to
//! fallback a sequential execution of the transaction that used the skipped one as a dependency.
mod coin;
mod contracts_changes;

use std::{
Expand Down Expand Up @@ -98,6 +97,10 @@ use tokio::runtime::Runtime;

use crate::{
checked_transaction_ext::CheckedTransactionExt,
coin::{
CoinDependencyChainVerifier,
CoinInBatch,
},
column_adapter::ContractColumnsIterator,
config::Config,
l1_execution_data::L1ExecutionData,
Expand All @@ -109,10 +112,6 @@ use crate::{
},
tx_waiter::NoWaitTxs,
};
use coin::{
CoinDependencyChainVerifier,
CoinInBatch,
};

pub struct Scheduler<R, S, PreconfirmationSender> {
/// Config
Expand Down Expand Up @@ -230,6 +229,14 @@ pub enum SchedulerError {
StorageError(StorageError),
/// Internal error
InternalError(String),
/// Mint missing error
MintMissing,
/// Skipped transaction error
SkippedTransaction(ExecutorError),
/// Block mismatch
BlockMismatch,
/// Consensus parameters not found
ConsensusParametersNotFound(u32),
}

impl From<StorageError> for SchedulerError {
Expand Down
25 changes: 25 additions & 0 deletions crates/services/parallel-executor/src/txs_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use fuel_core_types::fuel_tx::{
ContractId,
Transaction,
field::{
InputContract,
MintGasPrice,
},
};

use crate::scheduler::SchedulerError;

/// Returns the coinbase transaction details if it exists.
pub trait TxCoinbaseExt {
fn coinbase(&self) -> Result<(u64, ContractId), SchedulerError>;
}

impl TxCoinbaseExt for &[Transaction] {
fn coinbase(&self) -> Result<(u64, ContractId), SchedulerError> {
if let Some(Transaction::Mint(mint)) = self.last() {
Ok((*mint.gas_price(), mint.input_contract().contract_id))
} else {
Err(SchedulerError::MintMissing)
}
}
}
Loading
Loading