diff --git a/cmd/loadtest/cmd.go b/cmd/loadtest/cmd.go index 544eaf58..4b380bf4 100644 --- a/cmd/loadtest/cmd.go +++ b/cmd/loadtest/cmd.go @@ -37,6 +37,9 @@ var uniswapCfg = &config.UniswapV3Config{} // gasManagerCfg holds gas manager configuration. var gasManagerCfg = &config.GasManagerConfig{} +// preconfCfg holds preconf tracking configuration. +var preconfCfg = &config.PreconfConfig{} + // LoadtestCmd represents the loadtest command. var LoadtestCmd = &cobra.Command{ Use: "loadtest", @@ -60,6 +63,8 @@ var LoadtestCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { // Attach gas manager config. cfg.GasManager = gasManagerCfg + // Attach preconf config. + cfg.Preconf = preconfCfg return loadtest.Run(cmd.Context(), cfg) }, } @@ -78,6 +83,7 @@ var uniswapv3Cmd = &cobra.Command{ cfg.Modes = []string{"v3"} cfg.UniswapV3 = uniswapCfg cfg.GasManager = gasManagerCfg + cfg.Preconf = preconfCfg return loadtest.Run(cmd.Context(), cfg) }, @@ -121,8 +127,11 @@ func initPersistentFlags() { pf.BoolVar(&cfg.LegacyTxMode, "legacy", false, "send a legacy transaction instead of an EIP1559 transaction") pf.BoolVar(&cfg.FireAndForget, "fire-and-forget", false, "send transactions and load without waiting for it to be mined") pf.BoolVar(&cfg.FireAndForget, "send-only", false, "alias for --fire-and-forget") - pf.BoolVar(&cfg.CheckForPreconf, "check-preconf", false, "check for preconf status after sending tx") - pf.StringVar(&cfg.PreconfStatsFile, "preconf-stats-file", "", "path for preconf stats JSON output, updated every 2 seconds") + pf.BoolVar(&preconfCfg.Enabled, "check-preconf", false, "check for preconf status after sending tx") + pf.StringVar(&preconfCfg.StatsFile, "preconf-stats-file", "", "path for preconf stats JSON output, updated every 2 seconds") + pf.IntVar(&preconfCfg.BatchSize, "preconf-batch-size", 50, "transactions per batch RPC call for preconf tracking") + pf.DurationVar(&preconfCfg.PollInterval, "preconf-poll-interval", 100*time.Millisecond, "interval between batch polls for preconf tracking") + pf.DurationVar(&preconfCfg.Timeout, "preconf-timeout", time.Minute, "timeout for tracking each transaction") initGasManagerFlags() } diff --git a/doc/polycli_loadtest.md b/doc/polycli_loadtest.md index 623dafeb..0a3544d5 100644 --- a/doc/polycli_loadtest.md +++ b/doc/polycli_loadtest.md @@ -162,7 +162,10 @@ The codebase has a contract that used for load testing. It's written in Solidity --output-mode string format mode for summary output (json | text) (default "text") --output-raw-tx-only output raw signed transaction hex without sending (works with most modes except RPC and UniswapV3) --pre-fund-sending-accounts fund all sending accounts at start instead of on first use + --preconf-batch-size int transactions per batch RPC call for preconf tracking (default 100) + --preconf-poll-interval duration interval between batch polls for preconf tracking (default 500ms) --preconf-stats-file string path for preconf stats JSON output, updated every 2 seconds + --preconf-timeout duration timeout for tracking each transaction (default 1m0s) --priority-gas-price uint gas tip price for EIP-1559 transactions --private-key string hex encoded private key to use for sending transactions (default "42b6e34dc21598a807dc19d7784c71b2a7a01f6480dc6f58258f78e539f1a1fa") --proxy string use the proxy specified diff --git a/doc/polycli_loadtest_uniswapv3.md b/doc/polycli_loadtest_uniswapv3.md index 16bf0087..3255b34e 100644 --- a/doc/polycli_loadtest_uniswapv3.md +++ b/doc/polycli_loadtest_uniswapv3.md @@ -105,7 +105,10 @@ The command also inherits flags from parent commands. --nonce uint use this flag to manually set the starting nonce --output-mode string format mode for summary output (json | text) (default "text") --output-raw-tx-only output raw signed transaction hex without sending (works with most modes except RPC and UniswapV3) + --preconf-batch-size int transactions per batch RPC call for preconf tracking (default 100) + --preconf-poll-interval duration interval between batch polls for preconf tracking (default 500ms) --preconf-stats-file string path for preconf stats JSON output, updated every 2 seconds + --preconf-timeout duration timeout for tracking each transaction (default 1m0s) --pretty-logs output logs in pretty format instead of JSON (default true) --priority-gas-price uint gas tip price for EIP-1559 transactions --private-key string hex encoded private key to use for sending transactions (default "42b6e34dc21598a807dc19d7784c71b2a7a01f6480dc6f58258f78e539f1a1fa") diff --git a/loadtest/config/config.go b/loadtest/config/config.go index 67ecc023..c7d1b175 100644 --- a/loadtest/config/config.go +++ b/loadtest/config/config.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "math/big" + "time" "github.com/0xPolygon/polygon-cli/loadtest/uniswapv3" "github.com/ethereum/go-ethereum/common" @@ -51,11 +52,9 @@ type Config struct { ToAddress string EthAmountInWei uint64 RandomRecipients bool - LegacyTxMode bool - FireAndForget bool - CheckForPreconf bool - PreconfStatsFile string - WaitForReceipt bool + LegacyTxMode bool + FireAndForget bool + WaitForReceipt bool ReceiptRetryMax uint ReceiptRetryDelay uint // initial delay in milliseconds OutputRawTxOnly bool @@ -117,6 +116,9 @@ type Config struct { // Gas manager config (optional, for gas oscillation features) GasManager *GasManagerConfig + // Preconf tracking config + Preconf *PreconfConfig + // Computed fields (populated during initialization) CurrentGasPrice *big.Int CurrentGasTipCap *big.Int @@ -147,6 +149,15 @@ type GasManagerConfig struct { DynamicGasPricesVariation float64 // ±percentage variation for dynamic } +// PreconfConfig holds preconf tracking configuration. +type PreconfConfig struct { + Enabled bool // enable preconf tracking + StatsFile string // path for stats JSON output + BatchSize int // number of transactions per batch RPC call + PollInterval time.Duration // interval between batch polls + Timeout time.Duration // timeout for tracking each transaction +} + // UniswapV3Config holds UniswapV3-specific configuration. type UniswapV3Config struct { // Pre-deployed contract addresses (as hex strings). diff --git a/loadtest/preconf.go b/loadtest/preconf.go index 876ab166..5d2a23fc 100644 --- a/loadtest/preconf.go +++ b/loadtest/preconf.go @@ -4,14 +4,16 @@ import ( "context" "encoding/json" "os" + "strings" "sync" "sync/atomic" "time" - "github.com/0xPolygon/polygon-cli/util" + "github.com/0xPolygon/polygon-cli/loadtest/config" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" + ethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/montanaflynn/stats" "github.com/rs/zerolog/log" ) @@ -28,16 +30,24 @@ type PreconfTxResult struct { // PreconfSummary holds aggregate stats from the preconf tracker. type PreconfSummary struct { - TotalTasks uint64 `json:"total_tasks"` - PreconfSuccess uint64 `json:"preconf_success"` - PreconfFail uint64 `json:"preconf_fail"` - BothFailed uint64 `json:"both_failed"` - IneffectivePreconf uint64 `json:"ineffective_preconf"` - FalsePositives uint64 `json:"false_positives"` - Confidence uint64 `json:"confidence"` - ReceiptSuccess uint64 `json:"receipt_success"` - ReceiptFail uint64 `json:"receipt_fail"` - TotalGasUsed uint64 `json:"total_gas_used"` + TotalTasks uint64 `json:"total_tasks"` + + // Preconf totals + PreconfSuccess uint64 `json:"preconf_success"` // preconf=true + PreconfFail uint64 `json:"preconf_fail"` // preconf=false + + // Receipt totals + ReceiptSuccess uint64 `json:"receipt_success"` // receipt=yes + ReceiptFail uint64 `json:"receipt_fail"` // receipt=no + + // 2x2 outcome matrix + BothConfirmed uint64 `json:"both_confirmed"` // preconf=true AND receipt=yes + PreconfOnly uint64 `json:"preconf_only"` // preconf=true AND receipt=no + ReceiptOnly uint64 `json:"receipt_only"` // preconf=false AND receipt=yes + NeitherConfirmed uint64 `json:"neither_confirmed"` // preconf=false AND receipt=no + + Confidence uint64 `json:"confidence"` // both_confirmed AND block_diff < 10 + TotalGasUsed uint64 `json:"total_gas_used"` // Preconf duration percentiles (milliseconds) PreconfP50 float64 `json:"preconf_p50,omitempty"` @@ -60,133 +70,398 @@ type PreconfStats struct { Transactions []PreconfTxResult `json:"transactions"` } +// trackedTx holds tracking state for a pending transaction. +// Resolution state is determined by non-nil fields: +// - receipt != nil OR receiptError != nil means receipt is resolved +// - preconfResult != nil OR preconfError != nil means preconf is resolved +type trackedTx struct { + hash common.Hash + registeredAt time.Time + startBlock uint64 + // Receipt resolution - exactly one of (receipt, receiptError) will be set when resolved + receipt *types.Receipt + receiptTime time.Duration + receiptError error + // Preconf resolution - exactly one of (preconfResult, preconfError) will be set when resolved + preconfResult *bool + preconfTime time.Duration + preconfError error +} + +// receiptResolved returns true if the receipt has been resolved (success or error). +func (tx *trackedTx) receiptResolved() bool { + return tx.receipt != nil || tx.receiptError != nil +} + +// preconfResolved returns true if the preconf status has been resolved (success or error). +func (tx *trackedTx) preconfResolved() bool { + return tx.preconfResult != nil || tx.preconfError != nil +} + +// PreconfTracker tracks preconf and receipt status using centralized batch polling. type PreconfTracker struct { - client *ethclient.Client - statsFilePath string - - // preconf metrics - preconfSuccess atomic.Uint64 - preconfFail atomic.Uint64 - totalTasks atomic.Uint64 - bothFailedCount atomic.Uint64 - ineffectivePreconf atomic.Uint64 - falsePositiveCount atomic.Uint64 - confidence atomic.Uint64 - - // receipt metrics + client *ethclient.Client + rpc *ethrpc.Client + cfg *config.PreconfConfig + + // Pending transactions awaiting receipt/preconf + pendingMu sync.RWMutex + pending map[common.Hash]*trackedTx + + // Completed transaction results + completedMu sync.Mutex + completed []*trackedTx + + // Metrics (atomic for lock-free access) + totalTasks atomic.Uint64 + preconfSuccess atomic.Uint64 + preconfFail atomic.Uint64 receiptSuccess atomic.Uint64 receiptFail atomic.Uint64 - totalGasUsed atomic.Uint64 - mu sync.Mutex - txResults []PreconfTxResult + // 2x2 outcome matrix + bothConfirmed atomic.Uint64 + preconfOnly atomic.Uint64 + receiptOnly atomic.Uint64 + neitherConfirmed atomic.Uint64 + + confidence atomic.Uint64 + totalGasUsed atomic.Uint64 + + // Cached block number to reduce RPC calls + cachedBlock atomic.Uint64 + cachedBlockTime atomic.Int64 + + // Shutdown coordination + wg sync.WaitGroup } -func NewPreconfTracker(client *ethclient.Client, statsFilePath string) *PreconfTracker { +// NewPreconfTracker creates a new PreconfTracker. +func NewPreconfTracker(client *ethclient.Client, rpcClient *ethrpc.Client, cfg *config.PreconfConfig) *PreconfTracker { return &PreconfTracker{ - client: client, - statsFilePath: statsFilePath, - txResults: make([]PreconfTxResult, 0, 1024), + client: client, + rpc: rpcClient, + cfg: cfg, + pending: make(map[common.Hash]*trackedTx), + completed: make([]*trackedTx, 0, 1024), } } -func (pt *PreconfTracker) Track(txHash common.Hash) { - currentBlock, err := pt.client.BlockNumber(context.Background()) - if err != nil { +// Start begins the batch polling loops for receipts and preconf status. +func (pt *PreconfTracker) Start(ctx context.Context) { + // Start receipt poller + pt.wg.Add(1) + go pt.receiptPoller(ctx) + + // Start preconf poller + pt.wg.Add(1) + go pt.preconfPoller(ctx) + + // Start stats file writer if path is configured + if pt.cfg.StatsFile != "" { + pt.wg.Add(1) + go pt.statsWriter(ctx) + } +} + +// getBlockNumber returns the current block number, cached to reduce RPC calls. +// The cache is refreshed at most once per second. +func (pt *PreconfTracker) getBlockNumber() uint64 { + now := time.Now().UnixNano() + lastUpdate := pt.cachedBlockTime.Load() + if now-lastUpdate < int64(time.Second) { + return pt.cachedBlock.Load() + } + if pt.cachedBlockTime.CompareAndSwap(lastUpdate, now) { + if block, err := pt.client.BlockNumber(context.Background()); err == nil { + pt.cachedBlock.Store(block) + } + } + return pt.cachedBlock.Load() +} + +// RegisterTx adds a transaction hash to be tracked. Non-blocking. +func (pt *PreconfTracker) RegisterTx(hash common.Hash) { + pt.pendingMu.Lock() + pt.pending[hash] = &trackedTx{ + hash: hash, + registeredAt: time.Now(), + startBlock: pt.getBlockNumber(), + } + pt.pendingMu.Unlock() +} + +// receiptPoller polls for receipts in batches. +func (pt *PreconfTracker) receiptPoller(ctx context.Context) { + defer pt.wg.Done() + + ticker := time.NewTicker(pt.cfg.PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pt.pollReceipts(ctx) + } + } +} + +// preconfPoller polls for preconf status in batches. +func (pt *PreconfTracker) preconfPoller(ctx context.Context) { + defer pt.wg.Done() + + ticker := time.NewTicker(pt.cfg.PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pt.pollPreconf(ctx) + } + } +} + +// statsWriter periodically writes stats to file. +func (pt *PreconfTracker) statsWriter(ctx context.Context) { + defer pt.wg.Done() + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pt.writeStats() + } + } +} + +// pollReceipts fetches receipts for pending transactions in batches. +func (pt *PreconfTracker) pollReceipts(ctx context.Context) { + pt.pendingMu.RLock() + hashes := make([]common.Hash, 0, len(pt.pending)) + for hash, tx := range pt.pending { + if !tx.receiptResolved() { + hashes = append(hashes, hash) + } + } + pt.pendingMu.RUnlock() + + if len(hashes) == 0 { + return + } + + // Process in batches + for i := 0; i < len(hashes); i += pt.cfg.BatchSize { + end := min(i+pt.cfg.BatchSize, len(hashes)) + batch := hashes[i:end] + pt.getReceipts(ctx, batch) + } + + // Check for completed/timed out transactions + pt.checkTx() +} + +// pollPreconf fetches preconf status for pending transactions in batches. +func (pt *PreconfTracker) pollPreconf(ctx context.Context) { + pt.pendingMu.RLock() + hashes := make([]common.Hash, 0, len(pt.pending)) + for hash, tx := range pt.pending { + if !tx.preconfResolved() { + hashes = append(hashes, hash) + } + } + pt.pendingMu.RUnlock() + + if len(hashes) == 0 { + return + } + + // Process in batches + for i := 0; i < len(hashes); i += pt.cfg.BatchSize { + end := min(i+pt.cfg.BatchSize, len(hashes)) + batch := hashes[i:end] + pt.getPreconfs(ctx, batch) + } +} + +// getReceipts fetches multiple receipts in a single batch RPC call. +func (pt *PreconfTracker) getReceipts(ctx context.Context, hashes []common.Hash) { + if len(hashes) == 0 { + return + } + + // Create batch elements + batch := make([]ethrpc.BatchElem, len(hashes)) + receipts := make([]*types.Receipt, len(hashes)) + + for i, hash := range hashes { + receipts[i] = new(types.Receipt) + batch[i] = ethrpc.BatchElem{ + Method: "eth_getTransactionReceipt", + Args: []any{hash}, + Result: receipts[i], + } + } + + // Execute batch call + if err := pt.rpc.BatchCallContext(ctx, batch); err != nil { + log.Warn().Err(err).Int("count", len(hashes)).Msg("Batch receipt call failed") + return + } + + // Process results + pt.pendingMu.Lock() + now := time.Now() + for i, hash := range hashes { + tx, exists := pt.pending[hash] + if !exists { + continue + } + + if batch[i].Error != nil { + // "missing required field" errors typically mean the tx isn't mined yet + // (some RPCs return {} instead of null for pending txs) + // Don't treat as permanent error - continue polling + if strings.Contains(batch[i].Error.Error(), "missing required field") { + continue + } + tx.receiptError = batch[i].Error + log.Warn().Err(batch[i].Error).Str("hash", hash.Hex()).Msg("Receipt batch element error") + continue + } + + // Check if receipt is nil (not yet mined) + if receipts[i] == nil || receipts[i].BlockNumber == nil { + continue + } + + tx.receipt = receipts[i] + tx.receiptTime = now.Sub(tx.registeredAt) + } + pt.pendingMu.Unlock() +} + +// getPreconfs checks preconf status for multiple transactions in a single batch RPC call. +func (pt *PreconfTracker) getPreconfs(ctx context.Context, hashes []common.Hash) { + if len(hashes) == 0 { return } - // wait for preconf - var wg sync.WaitGroup - var preconfStatus bool - var preconfError error - var preconfDuration time.Duration - wg.Add(1) - go func() { - defer wg.Done() + // Create batch elements + batch := make([]ethrpc.BatchElem, len(hashes)) + results := make([]bool, len(hashes)) - preconfStartTime := time.Now() - defer func() { - preconfDuration = time.Since(preconfStartTime) - }() + for i, hash := range hashes { + batch[i] = ethrpc.BatchElem{ + Method: "eth_checkPreconfStatus", + Args: []any{hash.Hex()}, + Result: &results[i], + } + } + + // Execute batch call + if err := pt.rpc.BatchCallContext(ctx, batch); err != nil { + log.Warn().Err(err).Int("count", len(hashes)).Msg("Batch preconf call failed") + return + } + + // Process results + pt.pendingMu.Lock() + now := time.Now() + for i, hash := range hashes { + tx, exists := pt.pending[hash] + if !exists { + continue + } - preconfStatus, preconfError = util.WaitPreconf(context.Background(), pt.client, txHash, time.Minute) - }() + if batch[i].Error != nil { + tx.preconfError = batch[i].Error + log.Warn().Err(batch[i].Error).Str("hash", hash.Hex()).Msg("Preconf batch element error") + continue + } - // wait for receipt - var receipt *types.Receipt - var receiptError error - var receiptDuration time.Duration - wg.Add(1) - go func() { - defer wg.Done() + // Record preconf result (true = confirmed, false = not confirmed) + tx.preconfResult = &results[i] + tx.preconfTime = now.Sub(tx.registeredAt) + } + pt.pendingMu.Unlock() +} - time.Sleep(100 * time.Millisecond) +// checkTx moves completed or timed out transactions to the completed list. +func (pt *PreconfTracker) checkTx() { + now := time.Now() - receiptTime := time.Now() - defer func() { - receiptDuration = time.Since(receiptTime) - }() + pt.pendingMu.Lock() + var toRemove []common.Hash - receipt, receiptError = util.WaitReceiptWithTimeout(context.Background(), pt.client, txHash, time.Minute) - }() + for hash, tx := range pt.pending { + timedOut := now.Sub(tx.registeredAt) >= pt.cfg.Timeout + receiptDone := tx.receiptResolved() || timedOut + preconfDone := tx.preconfResolved() || timedOut - wg.Wait() + if receiptDone && preconfDone { + toRemove = append(toRemove, hash) + pt.recordMetrics(tx) + } + } - // Build per-transaction result - result := PreconfTxResult{ - TxHash: txHash.Hex(), + for _, hash := range toRemove { + delete(pt.pending, hash) } + pt.pendingMu.Unlock() +} +// recordMetrics records the final metrics for a completed transaction. +func (pt *PreconfTracker) recordMetrics(tx *trackedTx) { pt.totalTasks.Add(1) - if preconfStatus { + + preconfTrue := tx.preconfResult != nil && *tx.preconfResult + receiptOK := tx.receipt != nil + + // Track preconf totals + if preconfTrue { pt.preconfSuccess.Add(1) - result.PreconfDurationMs = preconfDuration.Milliseconds() } else { pt.preconfFail.Add(1) } - // Track receipt metrics - if receiptError == nil { + // Track receipt totals + if receiptOK { pt.receiptSuccess.Add(1) - pt.totalGasUsed.Add(receipt.GasUsed) - result.ReceiptDurationMs = receiptDuration.Milliseconds() - result.GasUsed = receipt.GasUsed - result.Status = receipt.Status - result.BlockDiff = receipt.BlockNumber.Uint64() - currentBlock + pt.totalGasUsed.Add(tx.receipt.GasUsed) } else { pt.receiptFail.Add(1) } - // Append result under lock - pt.mu.Lock() - pt.txResults = append(pt.txResults, result) - pt.mu.Unlock() - + // Track 2x2 matrix switch { - case preconfError != nil && receiptError != nil: - // Both failed: no tx inclusion in txpool or block - pt.bothFailedCount.Add(1) - - case preconfError == nil && receiptError != nil: - // False positive: preconf said tx is included but never got executed - pt.falsePositiveCount.Add(1) - - case preconfError != nil && receiptError == nil: - // Receipt arrived but preconf failed: preconf wasn't effective - pt.ineffectivePreconf.Add(1) - - case preconfError == nil && receiptError == nil: - // Both succeeded - if preconfDuration > receiptDuration { - // Receipt arrived before preconf: preconf wasn't effective - pt.ineffectivePreconf.Add(1) - } - // Track confidence (block diff < 10) - if result.BlockDiff < 10 { - pt.confidence.Add(1) + case preconfTrue && receiptOK: + pt.bothConfirmed.Add(1) + if tx.startBlock > 0 { + blockDiff := tx.receipt.BlockNumber.Uint64() - tx.startBlock + if blockDiff < 10 { + pt.confidence.Add(1) + } } + case preconfTrue && !receiptOK: + pt.preconfOnly.Add(1) + case !preconfTrue && receiptOK: + pt.receiptOnly.Add(1) + case !preconfTrue && !receiptOK: + pt.neitherConfirmed.Add(1) } + + // Add to completed list for stats + pt.completedMu.Lock() + pt.completed = append(pt.completed, tx) + pt.completedMu.Unlock() } // Percentiles holds p50, p75, p90, p95, p99 values. @@ -198,9 +473,9 @@ type Percentiles struct { P99 float64 } -// calculatePercentiles computes p50, p75, p90, p95, p99 for a slice of durations. +// percentiles computes p50, p75, p90, p95, p99 for a slice of durations. // Returns zero values if the input slice is empty. -func calculatePercentiles(durations []float64) Percentiles { +func percentiles(durations []float64) Percentiles { if len(durations) == 0 { return Percentiles{} } @@ -212,31 +487,15 @@ func calculatePercentiles(durations []float64) Percentiles { return Percentiles{P50: p50, P75: p75, P90: p90, P95: p95, P99: p99} } -// Start begins periodic stats file writing every 2 seconds until context is cancelled. -func (pt *PreconfTracker) Start(ctx context.Context) { - if pt.statsFilePath == "" { - return - } - go func() { - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - pt.writeStatsFile() - case <-ctx.Done(): - return - } - } - }() -} - // Stats logs the final summary and writes the stats file. func (pt *PreconfTracker) Stats() { + // Finalize any remaining pending transactions + pt.finalizePending() + output := pt.buildStats() log.Info().Any("summary", output.Summary).Msg("Preconf tracker stats") - if pt.statsFilePath == "" { + if pt.cfg.StatsFile == "" { return } data, err := json.MarshalIndent(output, "", " ") @@ -244,66 +503,95 @@ func (pt *PreconfTracker) Stats() { log.Error().Err(err).Msg("Failed to marshal preconf stats") return } - if err := os.WriteFile(pt.statsFilePath, data, 0644); err != nil { + if err := os.WriteFile(pt.cfg.StatsFile, data, 0644); err != nil { log.Error().Err(err).Msg("Failed to write preconf stats file") } } -func (pt *PreconfTracker) writeStatsFile() { +// finalizePending moves all remaining pending transactions to completed (as timed out). +func (pt *PreconfTracker) finalizePending() { + pt.pendingMu.Lock() + for _, tx := range pt.pending { + pt.recordMetrics(tx) + } + pt.pending = make(map[common.Hash]*trackedTx) + pt.pendingMu.Unlock() +} + +func (pt *PreconfTracker) writeStats() { data, err := json.MarshalIndent(pt.buildStats(), "", " ") if err != nil { log.Error().Err(err).Msg("Failed to marshal preconf stats") return } - if err := os.WriteFile(pt.statsFilePath, data, 0644); err != nil { + if err := os.WriteFile(pt.cfg.StatsFile, data, 0644); err != nil { log.Error().Err(err).Msg("Failed to write preconf stats file") } } func (pt *PreconfTracker) buildStats() PreconfStats { - pt.mu.Lock() - txResults := make([]PreconfTxResult, len(pt.txResults)) - copy(txResults, pt.txResults) - pt.mu.Unlock() - - var preconfDurations, receiptDurations []float64 - for _, tx := range txResults { - if tx.PreconfDurationMs > 0 { - preconfDurations = append(preconfDurations, float64(tx.PreconfDurationMs)) + pt.completedMu.Lock() + completed := make([]*trackedTx, len(pt.completed)) + copy(completed, pt.completed) + pt.completedMu.Unlock() + + var pd, rd []float64 + txs := make([]PreconfTxResult, 0, len(completed)) + + for _, tx := range completed { + result := PreconfTxResult{ + TxHash: tx.hash.Hex(), } - if tx.ReceiptDurationMs > 0 { - receiptDurations = append(receiptDurations, float64(tx.ReceiptDurationMs)) + + if tx.preconfResult != nil && *tx.preconfResult { + result.PreconfDurationMs = tx.preconfTime.Milliseconds() + pd = append(pd, float64(result.PreconfDurationMs)) } + + if tx.receipt != nil { + result.ReceiptDurationMs = tx.receiptTime.Milliseconds() + result.GasUsed = tx.receipt.GasUsed + result.Status = tx.receipt.Status + if tx.startBlock > 0 { + result.BlockDiff = tx.receipt.BlockNumber.Uint64() - tx.startBlock + } + rd = append(rd, float64(result.ReceiptDurationMs)) + } + + txs = append(txs, result) } - preconfPct := calculatePercentiles(preconfDurations) - receiptPct := calculatePercentiles(receiptDurations) + pp := percentiles(pd) + rp := percentiles(rd) return PreconfStats{ Summary: PreconfSummary{ - TotalTasks: pt.totalTasks.Load(), - PreconfSuccess: pt.preconfSuccess.Load(), - PreconfFail: pt.preconfFail.Load(), - BothFailed: pt.bothFailedCount.Load(), - IneffectivePreconf: pt.ineffectivePreconf.Load(), - FalsePositives: pt.falsePositiveCount.Load(), - Confidence: pt.confidence.Load(), - ReceiptSuccess: pt.receiptSuccess.Load(), - ReceiptFail: pt.receiptFail.Load(), - TotalGasUsed: pt.totalGasUsed.Load(), - - PreconfP50: preconfPct.P50, - PreconfP75: preconfPct.P75, - PreconfP90: preconfPct.P90, - PreconfP95: preconfPct.P95, - PreconfP99: preconfPct.P99, - - ReceiptP50: receiptPct.P50, - ReceiptP75: receiptPct.P75, - ReceiptP90: receiptPct.P90, - ReceiptP95: receiptPct.P95, - ReceiptP99: receiptPct.P99, + TotalTasks: pt.totalTasks.Load(), + PreconfSuccess: pt.preconfSuccess.Load(), + PreconfFail: pt.preconfFail.Load(), + ReceiptSuccess: pt.receiptSuccess.Load(), + ReceiptFail: pt.receiptFail.Load(), + + BothConfirmed: pt.bothConfirmed.Load(), + PreconfOnly: pt.preconfOnly.Load(), + ReceiptOnly: pt.receiptOnly.Load(), + NeitherConfirmed: pt.neitherConfirmed.Load(), + + Confidence: pt.confidence.Load(), + TotalGasUsed: pt.totalGasUsed.Load(), + + PreconfP50: pp.P50, + PreconfP75: pp.P75, + PreconfP90: pp.P90, + PreconfP95: pp.P95, + PreconfP99: pp.P99, + + ReceiptP50: rp.P50, + ReceiptP75: rp.P75, + ReceiptP90: rp.P90, + ReceiptP95: rp.P95, + ReceiptP99: rp.P99, }, - Transactions: txResults, + Transactions: txs, } } diff --git a/loadtest/runner.go b/loadtest/runner.go index b6d8cfec..b6b094b2 100644 --- a/loadtest/runner.go +++ b/loadtest/runner.go @@ -129,8 +129,8 @@ func (r *Runner) Init(ctx context.Context) error { } // Initialize preconf tracker if configured - if r.cfg.CheckForPreconf { - r.preconfTracker = NewPreconfTracker(r.client, r.cfg.PreconfStatsFile) + if r.cfg.Preconf != nil && r.cfg.Preconf.Enabled { + r.preconfTracker = NewPreconfTracker(r.client, r.rpcClient, r.cfg.Preconf) r.preconfTracker.Start(ctx) log.Info().Msg("Preconf tracker initialized") } @@ -624,8 +624,8 @@ func (r *Runner) mainLoop(ctx context.Context) error { } // Track preconf if configured - if tErr == nil && cfg.CheckForPreconf && r.preconfTracker != nil { - go r.preconfTracker.Track(ltTxHash) + if tErr == nil && r.preconfTracker != nil { + r.preconfTracker.RegisterTx(ltTxHash) } // Wait for receipt if configured