-
Notifications
You must be signed in to change notification settings - Fork 148
perf: add events.createBatch() for batch event creation #641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: pranaygp/perf-phase-3b-atomic-events
Are you sure you want to change the base?
perf: add events.createBatch() for batch event creation #641
Conversation
🦋 Changeset detectedLatest commit: 32f21aa The changes in this PR will be included in the next version bump. This PR includes changesets to release 16 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
🧪 E2E Test ResultsNo test result files found. ❌ Some E2E test jobs failed:
Check the workflow run for details. |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
853963d to
752bc6b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
f15434b to
32f21aa
Compare
5090eed to
faecbea
Compare
| // Update run status for run_completed events | ||
| if (runCompletedEvents.length > 0) { | ||
| const completedData = (runCompletedEvents[0] as any).eventData as { | ||
| output?: any; | ||
| }; | ||
| await drizzle | ||
| .update(Schema.runs) | ||
| .set({ | ||
| status: 'completed', | ||
| output: completedData.output as SerializedContent | undefined, | ||
| completedAt: now, | ||
| updatedAt: now, | ||
| }) | ||
| .where(eq(Schema.runs.runId, effectiveRunId)); | ||
| } | ||
|
|
||
| // Update run status for run_failed events | ||
| if (runFailedEvents.length > 0) { | ||
| const failedData = (runFailedEvents[0] as any).eventData as { | ||
| error: any; | ||
| errorCode?: string; | ||
| }; | ||
| const errorMessage = | ||
| typeof failedData.error === 'string' | ||
| ? failedData.error | ||
| : (failedData.error?.message ?? 'Unknown error'); | ||
| // Store structured error as JSON for deserializeRunError to parse | ||
| const errorJson = JSON.stringify({ | ||
| message: errorMessage, | ||
| stack: failedData.error?.stack, | ||
| code: failedData.errorCode, | ||
| }); | ||
| await drizzle | ||
| .update(Schema.runs) | ||
| .set({ | ||
| status: 'failed', | ||
| error: errorJson, | ||
| completedAt: now, | ||
| updatedAt: now, | ||
| }) | ||
| .where(eq(Schema.runs.runId, effectiveRunId)); | ||
| } | ||
|
|
||
| // Update run status for run_cancelled events | ||
| if (runCancelledEvents.length > 0) { | ||
| await drizzle | ||
| .update(Schema.runs) | ||
| .set({ | ||
| status: 'cancelled', | ||
| completedAt: now, | ||
| updatedAt: now, | ||
| }) | ||
| .where(eq(Schema.runs.runId, effectiveRunId)); | ||
| } | ||
|
|
||
| // Update run status for run_paused events | ||
| if (runPausedEvents.length > 0) { | ||
| await drizzle | ||
| .update(Schema.runs) | ||
| .set({ | ||
| status: 'paused', | ||
| updatedAt: now, | ||
| }) | ||
| .where(eq(Schema.runs.runId, effectiveRunId)); | ||
| } | ||
|
|
||
| // Update run status for run_resumed events | ||
| if (runResumedEvents.length > 0) { | ||
| await drizzle | ||
| .update(Schema.runs) | ||
| .set({ | ||
| status: 'running', | ||
| updatedAt: now, | ||
| }) | ||
| .where(eq(Schema.runs.runId, effectiveRunId)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The createBatch method is missing the hook cleanup logic that the create method performs when processing terminal run events (run_completed, run_failed, run_cancelled). This causes inconsistent behavior and prevents hook tokens from being reused.
View Details
📝 Patch Details
diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts
index 4d56f53..2075c10 100644
--- a/packages/world-postgres/src/storage.ts
+++ b/packages/world-postgres/src/storage.ts
@@ -777,6 +777,10 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
updatedAt: now,
})
.where(eq(Schema.runs.runId, effectiveRunId));
+ // Delete all hooks for this run to allow token reuse
+ await drizzle
+ .delete(Schema.hooks)
+ .where(eq(Schema.hooks.runId, effectiveRunId));
}
// Update run status for run_failed events
@@ -804,6 +808,10 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
updatedAt: now,
})
.where(eq(Schema.runs.runId, effectiveRunId));
+ // Delete all hooks for this run to allow token reuse
+ await drizzle
+ .delete(Schema.hooks)
+ .where(eq(Schema.hooks.runId, effectiveRunId));
}
// Update run status for run_cancelled events
@@ -816,6 +824,10 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
updatedAt: now,
})
.where(eq(Schema.runs.runId, effectiveRunId));
+ // Delete all hooks for this run to allow token reuse
+ await drizzle
+ .delete(Schema.hooks)
+ .where(eq(Schema.hooks.runId, effectiveRunId));
}
// Update run status for run_paused events
Analysis
Missing hook cleanup in createBatch() for terminal run events
What fails: The createBatch() method in packages/world-postgres/src/storage.ts fails to delete hooks when processing terminal run events (run_completed, run_failed, run_cancelled), unlike the create() method which properly cleans up hooks. This causes hook tokens to never be released for reuse, leading to potential token exhaustion in systems processing many workflow runs.
How to reproduce:
- Call
createBatch()with events including ahook_createdevent - Process a batch with a terminal event (run_completed, run_failed, or run_cancelled)
- Check the database hooks table - hooks will still exist for that run
- Attempt to create and complete another workflow run with the same hook token
- Observe that the old hooks are not cleaned up, preventing token reuse
Result: The hooks table retains entries for completed runs indefinitely. Comparing to the create() method behavior at lines 416-419, 451-454, and 471-474, these hooks should be deleted after status updates, but are not in createBatch().
Expected: Both create() and createBatch() should consistently delete hooks when terminal run events occur (lines 766-841 in createBatch should include hook deletion after each terminal status update, matching the pattern in create()).
Root cause: Missing drizzle.delete(Schema.hooks).where(eq(Schema.hooks.runId, effectiveRunId)) statements after the runCompletedEvents, runFailedEvents, and runCancelledEvents status update blocks in createBatch().
Implementation note: Added hook deletion logic to match the exact pattern used in the create() method, ensuring consistent behavior across both APIs and allowing hook tokens to be reused for future workflow runs.

🤖 Generated with Claude Code
Co-Authored-By: Claude Opus 4.5 [email protected]