Conversation
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
📝 WalkthroughWalkthroughAdds an in-memory per-task retry mechanism to the task scheduler: introduces Suggested reviewers
✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a2f027115f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-task-scheduler/src/lib.rs`:
- Line 7: Remove the test-only constant re-export from the crate root: the line
currently exporting TASK_EXECUTION_RETRY_LIMIT alongside TaskSchedulerService
should only expose TaskSchedulerService at the top level, so delete
TASK_EXECUTION_RETRY_LIMIT from the pub use in this file (leave pub use
service::TaskSchedulerService). If tests need the constant, reference it via
service::TASK_EXECUTION_RETRY_LIMIT or add a cfg(test) re-export instead.
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 315-317: The current retry logic calls execute_task() (which calls
process_transaction() and uses tx_counter.fetch_add() to mint a new transaction)
for every error via retry_task_execution(), causing duplicate distinct
transactions on ambiguous or post-submit failures; change the flow so retries
only occur for errors provably before send_transaction() (i.e., pre-submission
errors returned from process_transaction()) or, for errors after submission
(ambiguous send/confirm/update failures like those from send_transaction() or
db.update_task_after_execution()), do not call execute_task() again but instead
rebroadcast/confirm the original signature or re-run the persistence/reschedule
path without creating a new tx. Concretely: add error classification in
execute_task() / process_transaction() to return a variant (PreSendError vs
PostSendError with optional signature), have retry_task_execution() retry only
on PreSendError, and for PostSendError use the returned signature to rebroadcast
or re-run db.update_task_after_execution() rather than invoking
tx_counter.fetch_add() and creating a new transaction; apply the same change to
the other retry site around lines 353–369.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 0c12c1d8-a1d9-4eb4-bc74-10b6a58870cd
⛔ Files ignored due to path filters (1)
test-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
magicblock-task-scheduler/src/lib.rsmagicblock-task-scheduler/src/service.rstest-integration/test-task-scheduler/tests/test_schedule_error.rs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ca0b4a5f8f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b59c8aea9e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| self.db | ||
| .update_task_after_execution(task.id, execution_started_at) | ||
| .await?; |
There was a problem hiding this comment.
Prevent DB write failures from aborting the scheduler loop
Using ? on update_task_after_execution in the success path causes run() to return early on any transient SQLite error (for example, a temporary database is locked), which stops processing all future tasks instead of handling the failure for just this task. Before this change, execution errors were contained in the task branch; with this change, one metadata write failure can take down the entire scheduler until restart.
Useful? React with 👍 / 👎.
| let retry_delay = | ||
| Duration::from_millis(task.execution_interval_millis as u64); |
There was a problem hiding this comment.
Decouple retry backoff from the task's normal interval
The retry delay is set to execution_interval_millis, so tasks configured with long cadences (minutes/hours) do not retry until long after the original blockhash has expired; when that happens, the later blockhash check path treats the task as terminally failed. This means the new retry mechanism effectively provides little or no recovery window for long-interval tasks, even when the original send error was transient.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 265-267: The retry delay currently uses
task.execution_interval_millis (retry_delay =
Duration::from_millis(task.execution_interval_millis as u64)), which causes
retries to wait the full task interval and miss the blockhash window; change the
retry/backoff logic used around send_result so that transient send failures are
retried quickly using a short slot-sized backoff (e.g., a constant small
Duration representing a slot or a few hundred milliseconds) while the blockhash
is expected to be valid, and only after success (or exhausting blockhash
validity) resume using task.execution_interval_millis for the next scheduled
run; update the other identical occurrence around the send/dispatch path as well
so both retry_delay usages (the retry_delay variable and the send retry loop)
use the new short backoff for immediate retries and preserve
task.execution_interval_millis for the regular cadence.
- Around line 268-271: The retry path currently uses .expect() on
self.pending_executions.get(&task.id) (binding to execution) and similar
.expect()/unwrap() uses at the other occurrences; replace these panics with
explicit error handling: check whether get(&task.id) returns Some and if not
return a recoverable Err from the enclosing function (or log and skip the retry)
so the scheduler does not abort; update the function signature to propagate a
Result if needed, or map the missing-entry case to a clear error variant (with
task.id included) and handle it the same way at the other locations referenced
(the blocks around the current pending_executions.get(&task.id) usage and the
occurrences noted at the other ranges).
- Around line 344-347: When registering a new task the code calls
self.task_queue.insert(task.clone(), Duration::from_millis(0)) but discards the
returned Key, so remove_task_from_queue and re-registration can't find and
cancel the queued entry; change the flow in the block that calls
self.pending_executions.remove(&task.id); self.db.insert_task(&task).await?;
self.task_queue.insert(...) to capture the returned Key (from task_queue.insert)
and store it in the scheduler's queue-key map keyed by task.id (e.g., update or
add an entry in the same structure used by
remove_task_from_queue/process_request), ensuring subsequent
remove_task_from_queue can lookup and remove the right queued item and that
re-registering replaces the old Key.
- Around line 55-56: pending_executions is currently in-memory so in-flight
retries are lost across restarts causing prepare_execution() to mint a new
signature and reintroduce duplicate submits when send_transaction() returned an
ambiguous error; persist PendingExecution state to the DB, add a table/column to
store the PendingExecution (including task_id, original_signature/tx_hash,
created_at, and retry metadata), update the code paths that modify
pending_executions to write-through to the DB (e.g., when creating a
PendingExecution in prepare_execution() and when clearing it after
confirmation), and modify start() to load pending executions from the DB into
pending_executions on startup so prepare_execution() checks the persisted
pending entry and reuses the original signature/tx tracking instead of minting a
fresh one.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 22f6bdb6-9501-4f8d-99e9-f65d123e90dc
📒 Files selected for processing (1)
magicblock-task-scheduler/src/service.rs
| /// Retry state for tasks whose current execution is still in flight | ||
| pending_executions: HashMap<i64, PendingExecution>, |
There was a problem hiding this comment.
Persist in-flight retries across restarts.
pending_executions only lives in memory. If send_transaction() returns an ambiguous error and the scheduler restarts before the blockhash expires, start() will reload the task from the DB and prepare_execution() will mint a fresh signature even though the original tx may still land. That reintroduces the duplicate-submit hazard this PR is trying to avoid.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-task-scheduler/src/service.rs` around lines 55 - 56,
pending_executions is currently in-memory so in-flight retries are lost across
restarts causing prepare_execution() to mint a new signature and reintroduce
duplicate submits when send_transaction() returned an ambiguous error; persist
PendingExecution state to the DB, add a table/column to store the
PendingExecution (including task_id, original_signature/tx_hash, created_at, and
retry metadata), update the code paths that modify pending_executions to
write-through to the DB (e.g., when creating a PendingExecution in
prepare_execution() and when clearing it after confirmation), and modify start()
to load pending executions from the DB into pending_executions on startup so
prepare_execution() checks the persisted pending entry and reuses the original
signature/tx tracking instead of minting a fresh one.
| let retry_delay = | ||
| Duration::from_millis(task.execution_interval_millis as u64); | ||
| let send_result = { |
There was a problem hiding this comment.
Use a short retry backoff, not the task interval.
Line 265 ties retry timing to execution_interval_millis. For any task scheduled less frequently than the blockhash lifetime, the first retry happens after the original blockhash is already dead, so transient send failures never get a real retry. This should retry on a short slot-sized backoff while the blockhash is still valid, then return to normal cadence only after success.
Also applies to: 322-322
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-task-scheduler/src/service.rs` around lines 265 - 267, The retry
delay currently uses task.execution_interval_millis (retry_delay =
Duration::from_millis(task.execution_interval_millis as u64)), which causes
retries to wait the full task interval and miss the blockhash window; change the
retry/backoff logic used around send_result so that transient send failures are
retried quickly using a short slot-sized backoff (e.g., a constant small
Duration representing a slot or a few hundred milliseconds) while the blockhash
is expected to be valid, and only after success (or exhausting blockhash
validity) resume using task.execution_interval_millis for the next scheduled
run; update the other identical occurrence around the send/dispatch path as well
so both retry_delay usages (the retry_delay variable and the send retry loop)
use the new short backoff for immediate retries and preserve
task.execution_interval_millis for the regular cadence.
| let execution = self | ||
| .pending_executions | ||
| .get(&task.id) | ||
| .expect("pending execution was initialized"); |
There was a problem hiding this comment.
Replace expect() in the retry path.
These panics can take down the scheduler on an internal state mismatch instead of surfacing a recoverable scheduler error. Please convert them to explicit error handling, or document the invariant without panicking in release builds.
As per coding guidelines {magicblock-*,programs,storage-proto}/**: Treat any usage of .unwrap() or .expect() in production Rust code as a MAJOR issue. These should not be categorized as trivial or nit-level concerns. Request proper error handling or explicit justification with invariants.
Also applies to: 277-280, 288-291
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-task-scheduler/src/service.rs` around lines 268 - 271, The retry
path currently uses .expect() on self.pending_executions.get(&task.id) (binding
to execution) and similar .expect()/unwrap() uses at the other occurrences;
replace these panics with explicit error handling: check whether get(&task.id)
returns Some and if not return a recoverable Err from the enclosing function (or
log and skip the retry) so the scheduler does not abort; update the function
signature to propagate a Result if needed, or map the missing-entry case to a
clear error variant (with task.id included) and handle it the same way at the
other locations referenced (the blocks around the current
pending_executions.get(&task.id) usage and the occurrences noted at the other
ranges).
| self.pending_executions.remove(&task.id); | ||
| self.db.insert_task(&task).await?; | ||
| self.task_queue | ||
| .insert(task.clone(), Duration::from_millis(0)); |
There was a problem hiding this comment.
Track the queue key for newly registered tasks.
This inserts into task_queue but never stores the returned Key. A task scheduled through process_request() cannot be removed by remove_task_from_queue() before its first run, and re-registering the same ID can leave stale queued copies behind.
Suggested fix
self.pending_executions.remove(&task.id);
self.db.insert_task(&task).await?;
- self.task_queue
- .insert(task.clone(), Duration::from_millis(0));
+ if let Some(old_key) = self.task_queue_keys.remove(&task.id) {
+ self.task_queue.remove(&old_key);
+ }
+ let key = self.task_queue.insert(task.clone(), Duration::from_millis(0));
+ self.task_queue_keys.insert(task.id, key);
debug!("Registered task {} from context", task.id);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| self.pending_executions.remove(&task.id); | |
| self.db.insert_task(&task).await?; | |
| self.task_queue | |
| .insert(task.clone(), Duration::from_millis(0)); | |
| self.pending_executions.remove(&task.id); | |
| self.db.insert_task(&task).await?; | |
| if let Some(old_key) = self.task_queue_keys.remove(&task.id) { | |
| self.task_queue.remove(&old_key); | |
| } | |
| let key = self.task_queue.insert(task.clone(), Duration::from_millis(0)); | |
| self.task_queue_keys.insert(task.id, key); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-task-scheduler/src/service.rs` around lines 344 - 347, When
registering a new task the code calls self.task_queue.insert(task.clone(),
Duration::from_millis(0)) but discards the returned Key, so
remove_task_from_queue and re-registration can't find and cancel the queued
entry; change the flow in the block that calls
self.pending_executions.remove(&task.id); self.db.insert_task(&task).await?;
self.task_queue.insert(...) to capture the returned Key (from task_queue.insert)
and store it in the scheduler's queue-key map keyed by task.id (e.g., update or
add an entry in the same structure used by
remove_task_from_queue/process_request), ensuring subsequent
remove_task_from_queue can lookup and remove the right queued item and that
re-registering replaces the old Key.
Summary
Note: This retries the scheduled instruction after a failure using the same signature until the blockhash is valid, allowing recovery from temporary network or validator issues. It does not create a new transaction, and the scheduled instruction will still be unscheduled if the transaction ultimately fails. This slightly hardens the crank without changing the logic or introducing duplicate transaction issues.
Compatibility
Testing
Checklist
Summary by CodeRabbit
New Features
Improvements
Tests