Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ tags:
- auto-compilation
- worker-management
- developer-experience
featured: true
featured: false
---

import { Aside } from "@astrojs/starlight/components";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ tags:
- compilation
- configuration
- breaking-change
featured: true
featured: false
---

import { Aside } from "@astrojs/starlight/components";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
draft: true
draft: false
title: 'pgflow 0.12.0: Simpler Handler Signatures for Flow Composition'
description: 'Breaking change: asymmetric handler signatures remove the run wrapper, enabling functional composition'
date: 2025-12-24
Expand All @@ -18,17 +18,25 @@ pgflow 0.12.0 introduces asymmetric handler signatures - a breaking change that

## What Changed

Handler signatures are now asymmetric based on step type:
Handler signatures are now **asymmetric**. The `input.run` pattern no longer exists.

| Step Type | Before | After |
|-----------|--------|-------|
| Root step | `(input) => input.run.xxx` | `(flowInput, ctx) => flowInput.xxx` |
| Dependent step | `(input) => input.dep.xxx` | `(deps, ctx) => deps.dep.xxx` |
| Dependent (needs flowInput) | `(input) => input.run.xxx` | `async (deps, ctx) => (await ctx.flowInput).xxx` |
| Map step | `(item) => ...` | `async (item, ctx) => (await ctx.flowInput).xxx` |
```typescript del="input" del="input.run" del="input.dep" ins="flowInput" ins="deps" ins="async " ins="deps, ctx" ins="(await ctx.flowInput)"

// Root steps
(input) => input.run.xxx
(flowInput) => flowInput.xxx

// Dependent steps
(input) => input.dep.xxx
(deps) => deps.dep.xxx

// Dependent needing flowInput
(input) => input.run.xxx
async (deps, ctx) => (await ctx.flowInput).xxx
```

<Aside type="caution" title="Breaking Change">
All handler signatures must be updated. The `input.run` pattern no longer exists.
All handler signatures must be updated. See migration examples below.
</Aside>

## Why This Change?
Expand All @@ -49,74 +57,23 @@ const ChildFlow = new Flow<{ data: string }>()
});
```

By removing the wrapper, outputs from one flow can now become inputs to another without transformation.

## New Handler Signatures

### Root Steps

Root steps receive flow input directly as the first parameter:

```typescript
// .step() - root
.step({ slug: 'init' }, (flowInput, ctx) => {
console.log(ctx.env.API_KEY); // Context still available
return { userId: flowInput.userId };
})

// .array() - root (returns array for parallel processing)
.array({ slug: 'fetchAll' }, (flowInput, ctx) => {
return flowInput.urls; // Returns array directly
})
```

### Dependent Steps
By removing the wrapper, outputs from one flow can become inputs to another without transformation.

Dependent steps receive only their dependencies as the first parameter. Access `flowInput` via context if needed (async):

```typescript
// .step() - dependent (needs flowInput) - must be async
.step({ slug: 'process', dependsOn: ['init'] }, async (deps, ctx) => {
const flowInput = await ctx.flowInput; // Lazy-loaded, must await
const config = flowInput.config;
const data = deps.init.userId; // Dependencies directly
return { result: process(data, config) };
})

// .step() - dependent (doesn't need flowInput - most common)
.step({ slug: 'save', dependsOn: ['process'] }, (deps) => {
return deps.process.result; // Just use deps
})

// .array() - dependent
.array({ slug: 'splitResults', dependsOn: ['process'] }, (deps, ctx) => {
return deps.process.items; // Returns array for parallel processing
})
```

### Map Steps
<Aside type="note" title="Future Feature">
Flow composition (subflows) will be implemented in upcoming releases. This breaking change lays the groundwork by ensuring handler signatures are compatible with composable flows.
</Aside>

Map steps receive individual array elements. Access `flowInput` via context (async):
## Upgrading Your Flows

```typescript
// .map() - processes each array element (async if needs flowInput)
.map({ slug: 'processItem', array: 'fetchAll' }, async (item, ctx) => {
const flowInput = await ctx.flowInput; // Lazy-loaded, must await
return transform(item, flowInput.config);
})
```
Apply these patterns to update your handlers. The red/green highlights show exactly what changes.

<Aside type="tip" title="Why is ctx.flowInput async?">
`ctx.flowInput` is lazy-loaded to prevent data duplication. For map steps processing thousands of items, including the full flow input in each task would multiply data transfer. Instead, it's fetched on-demand and cached per run.
</Aside>

## Upgrading Your Flows

### Migration Patterns

Apply these transformations to your handlers:

#### Root Steps (.step, .array)
#### Root Steps (same for `.step` and `.array`)

```typescript del="input" del="input.run" ins="flowInput"
// BEFORE
Expand All @@ -130,18 +87,6 @@ Apply these transformations to your handlers:
})
```

```typescript del="input" del="input.run" ins="flowInput"
// BEFORE
.array({ slug: 'getUrls' }, (input) => {
return input.run.urls;
})

// AFTER
.array({ slug: 'getUrls' }, (flowInput) => {
return flowInput.urls;
})
```

#### Dependent Steps - Needing flowInput

```typescript del="input" del="input.run" del="input.init" ins="async" ins="deps, ctx" ins="await ctx.flowInput" ins="deps.init"
Expand Down Expand Up @@ -175,15 +120,17 @@ Apply these transformations to your handlers:
})
```

#### Map Steps - Accessing flowInput
#### Map Steps (no change needed if only using `item`)

Most map steps just use `item` and need no changes. Only update if you need `flowInput`:

```typescript ins="async" ins="item, ctx" ins="await ctx.flowInput"
// BEFORE
// BEFORE (accessing flowInput)
.map({ slug: 'transform', array: 'items' }, (item) => {
return process(item);
})

// AFTER (if you need flowInput - must be async)
// AFTER (must be async to access flowInput)
.map({ slug: 'transform', array: 'items' }, async (item, ctx) => {
const flowInput = await ctx.flowInput;
return process(item, flowInput.options);
Expand All @@ -192,6 +139,16 @@ Apply these transformations to your handlers:

### Production Upgrade Guide

The upgrade requires careful coordination to avoid running old code against the new SQL schema.

<Aside type="danger" title="Critical: No Workers Running During Migration">
It is **very important** to ensure:
- All worker functions are **disabled** (step 2)
- All existing workers have **stopped** (steps 3-4)

Running workers with old handler signatures against the new SQL schema will cause failures. Deprecated workers will gracefully finish their current task and exit - wait for this to complete before proceeding with the migration.
</Aside>

<Steps>

1. **Update handlers locally and test**
Expand Down Expand Up @@ -223,19 +180,31 @@ Apply these transformations to your handlers:

Deprecated workers finish their current task but won't call `start_tasks` again - so they won't be affected by the SQL changes.

4. **Apply database migration**
4. **Wait for workers to stop**

Monitor workers until all have exited:

```sql
SELECT COUNT(*) FROM pgflow.workers
WHERE function_name = 'my-worker'
AND stopped_at IS NULL;
```

Wait until this returns `0` before proceeding.

5. **Apply database migration**

```bash frame="none"
npx supabase db push
```

5. **Deploy new workers**
6. **Deploy new workers**

```bash frame="none"
npx supabase functions deploy my-worker
```

6. **Enable worker functions**
7. **Enable worker functions**

```sql
UPDATE pgflow.worker_functions
Expand All @@ -247,6 +216,11 @@ Apply these transformations to your handlers:

</Steps>

## Other Fixes

- Fixed `CONNECT_TIMEOUT` errors on Lovable.dev by switching to `jsr:@oscar6echo/postgres` fork
- Fixed `setTimeout` context binding issue in `@pgflow/client` for browser compatibility

---

Questions or issues? Join the [Discord community](https://discord.gg/pgflow) or [open a GitHub issue](https://github.com/pgflow-dev/pgflow/issues).
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ tags:
- dsl
- cli
- client
featured: true
featured: false
cover:
alt: 'Cyberpunk workflow engine with glowing teal circuits processing parallel data streams'
image: '../../../assets/cover-images/pgflow-0-7-0-public-beta-map-steps-and-documentation-redesign.png'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ tags:
- bugfix
- supabase
- pgmq
featured: true
featured: false
---

import { Aside } from "@astrojs/starlight/components";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ tags:
- realtime
- client
- core
featured: true
featured: false
---

import { Aside } from "@astrojs/starlight/components";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ tags:
- bugfix
- realtime
- client
featured: true
featured: false
---

import { Aside } from "@astrojs/starlight/components";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ tags:
- release
- breaking-change
- migration
featured: true
featured: false
cover:
alt: 'Cyberpunk hacker upgrading database dependencies on a glitchy terminal screen'
image: '../../../assets/cover-images/pgflow-0-8-0-modernizing-dependencies-pgmq-1-5-0-and-postgresql-17.png'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ tags:
- release
- control-plane
- compilation
featured: true
featured: false
---

import { Aside } from "@astrojs/starlight/components";
Expand Down