diff --git a/src/execution/CreateWorkQueue.md b/src/execution/CreateWorkQueue.md
new file mode 100644
index 0000000000..ff5de7cee0
--- /dev/null
+++ b/src/execution/CreateWorkQueue.md
@@ -0,0 +1,206 @@
+## Creating the Work Queue
+
+### Overview
+
+{CreateWorkQueue} returns the initial work and event stream that reports work progress, where the internal state of the queue is represented as an acyclic directed graph with its own internal queue of task completion and stream events. The graph evolves as work proceeds, modeling relationships among groups, tasks, and streams; new nodes may be discovered on internal {TASK_SUCCESS} or {STREAM_ITEMS} events, each which provide a value and optional new work (e.g. new groups, tasks, or streams).
+
+The internal events include:
+
+- {TASK_SUCCESS} reports that a {task} completed successfully with a {result} (containing a {value} and optional {work}).
+- {TASK_FAILURE} carries the failed {task} and the causal {error}.
+- {STREAM_ITEMS} reports that a {stream} produced one or more {items} (each containing a {value} and optional {work}).
+- {STREAM_SUCCESS} announces that a {stream} completed successfully.
+- {STREAM_FAILURE} includes both the {stream} and the terminal {error}.
+
+Consumers of the work queue do not see these internal events directly; instead, they receive external events emitted on the returned observable {workEventStream}. These external events summarize progress at the granularity of groups and streams, abstracting away individual task completions. Each event also enumerates any newly visible work segments (groups and streams that became roots) so consumers can react without re-querying the queue's internal state.
+
+The external events include {STREAM_SUCCESS} and {STREAM_FAILURE}, which correspond directly to their internal counterparts, as well as:
+
+- {GROUP_VALUES} delivers the task {values} for a completed root {group} (tasks already surfaced via other groups are omitted).
+- {GROUP_SUCCESS} delivers the completed {group}, a list of its task {values} (omitting those task {values} previously delivered within other groups), and any {newGroups}/{newStreams} that became roots.
+- {GROUP_FAILURE} carries the failed {group} and the propagated {error}.
+- {STREAM_VALUES} reports the producing {stream}, the yielded batch of item {values}, and any {newGroups}/{newStreams} introduced once the batch's {work} is integrated.
+- {WORK_QUEUE_TERMINATION} signals that there are no more root groups or streams.
+
+The work event stream concludes when the graph is empty.
+
+### Graph Definition
+
+- Let the graph be {G} = ({V}, {Egc}, {Egt}, {Ets}, {Etv}), where:
+ - {V} = {Vgroup} ∪ {Vtask} ∪ {Vstream} ∪ {Vvalue}.
+ ד - {Egc} are {group} → {childGroup} edges, capturing that a {childGroup} can begin only after its parent group succeeds.
+ - {Egt} are {group} → {task} edges.
+ - {Ets} are {task} → {stream} edges, capturing that a {stream} can begin as soon as its producing {task} yields an item.
+ - {Etv} are {task} → {value} edges, each pointing to a node that stores the completion value of its source task.
+- A root {group} has no incoming edge in {Egc}.
+- A root {stream} has no incoming edge in {Ets}.
+- Each {task} may be targeted by more than one {group} via {Egt}. A {task} always includes the list of {groups} that target it, as well as a function to execute. A {task} executes once and either fails with an {error} or succeeds with a {value} and optional {work}. The {value} component is stored in a {value} node targeted by {task} via {Etv}. Any {work} is immediately integrated (by {IntegrateWork}).
+- A {group} completes when every {task} it targets has completed successfully. A {group} fails if any {task} it targets fails.
+- A {stream} runs once started, producing zero or more items and then terminating either successfully or with an {error}. Each produced item yields a {value} and optional {work}; the {values} for each batch are reported via the internal {STREAM_ITEMS} event (which drives the external {STREAM_VALUES} event surfaced to consumers) and the {work} is immediately integrated into the graph.
+
+### Creating the Work Queue
+
+This algorithm constructs the internal {graphEventQueue}, initializes the graph with the supplied {groups}, {tasks}, and {streams}, and returns the observable {workEventStream}.
+
+CreateWorkQueue(work):
+
+- Let {graphEventQueue} be a new queue (capacity 1).
+- Let {graph} be initially empty.
+- Let {maybeEmptyNewGroups} and {newStreams} be the result of {IntegrateWork(work)}.
+- Let {newGroups} be the result of {PruneEmptyGroups(maybeEmptyNewGroups)}.
+- Call {StartWork(newGroups, newStreams)}.
+- Let {workEventStream} be the following event stream (each iteration yields the batch of external events produced while handling one batch of graph events):
+ - For each event {e} produced by {graphEventQueue}:
+ - If {e} is {TASK_SUCCESS(task, result)}, call {TaskSuccess(task, result)}.
+ - If {e} is {TASK_FAILURE(task, error)}, call {TaskFailure(task, error)}.
+ - If {e} is {STREAM_ITEMS(stream, items)}, call {StreamItems(stream, items)}.
+ - If {e} is {STREAM_SUCCESS(stream)}, call {StreamSuccess(stream)}.
+ - If {e} is {STREAM_FAILURE(stream, error)}, call {StreamFailure(stream, error)}.
+ - If {graph} has no root groups or root streams, close {workEventStream} and yield {WORK_QUEUE_TERMINATION}.
+
+- Return {newGroups}, {newStreams}, and {workEventStream}.
+
+The following algorithms have access to {graphEventQueue}, {graph}, and {workEventStream}.
+
+#### Integrating and Starting Work
+
+IntegrateWork(work, parentTask):
+
+- Let {groups}, {tasks}, and {streams} be the respective sets drawn from {work} (missing fields denote empty sets).
+- Initialize {visitedGroups} to the empty set and {rootGroups} to the empty list.
+- For each group {g} in {groups}:
+ - Let {maybeRoot} be the result of {AddGroup(g, groups, parentTask, visitedGroups)} to {rootGroups}.
+ - If {maybeRoot} is defined, append it to {rootGroups}.
+- For each task {t} in {tasks}:
+ - Insert {t} into {graph}.
+ - Let {groups} be the list of groups that target {t}.
+ - For each group {g} in {groups}:
+ - Record {g → t}.
+ - If {g} is a root and {t} is not-yet-started, call {StartTask(t)}.
+- For each stream {s} in {streams}:
+ - Insert {s} into {graph}.
+ - If {parentTask} is defined, record {parentTask → s}; otherwise {s} is a root.
+- Return the newly inserted root {groups} (namely {rootGroups}) and root {streams}.
+
+AddGroup(group, groupSet, parentTask, visitedGroups):
+
+- If {group} is in {visitedGroups}, return.
+- Add {group} to {visitedGroups}.
+- If {parentTask} is defined, assert that {group} must specify a {parent} group; initial work and stream items are the only sources of root groups.
+- If {group} does not specify a {parent} group:
+ - Insert {group} into {graph}.
+ - Return {group}.
+- If {parent} is in {groupSet}:
+ - Let {ancestor} be the result of {AddGroup(parent, groupSet, parentTask, visitedGroups)}.
+ - Insert {group} into {graph} and record {parent → group}.
+ - Return {ancestor}.
+- Otherwise, if {parent} is in {graph}:
+ - Record {parent → group}.
+
+PruneEmptyGroups(originalGroups):
+
+- Initialize {nonEmptyGroups} to the empty list.
+- For each {group} in {originalGroups}:
+ - If {group} targets at least one {task}, append {group} to {nonEmptyGroups} and continue to the next {group} in {originalGroups}.
+ - Let {maybeEmptyNewGroups} be the set of child groups targeted by {group}.
+ - Append the results of {PruneEmptyGroups(maybeEmptyNewGroups)} to {nonEmptyGroups}.
+ - Remove {group} from {graph}.
+- Return {nonEmptyGroups}.
+
+StartWork(newGroups, newStreams):
+
+- For each {group} in {newGroups}, call {StartGroup(group)}.
+- For each {stream} in {newStreams}, call {StartStream(stream)}.
+
+StartGroup(group):
+
+- For each {task} targeted by {group}:
+ - Call {StartTask(task)}.
+
+StartTask(task):
+
+- Start executing {task} such that:
+ - If and when {task} completes successfully with {result}, enqueue {TaskSuccess(task, result)} onto {graphEventQueue}.
+ - If and when {task} fails with {error}, enqueue {TaskFailure(task, error)} onto {graphEventQueue}.
+
+StartStream(stream):
+
+- Drain {stream} such that:
+ - On each successfully produced batch of {items}, enqueue {StreamItems(stream, items)} onto {graphEventQueue}.
+ - If and when {stream} terminates successfully, enqueue {StreamSuccess(stream)} onto {graphEventQueue}.
+ - If and when {stream} terminates with {error}, enqueue {StreamFailure(stream, error)} onto {graphEventQueue}.
+
+### Handling Task Success
+
+{TaskSuccess(task, result)} reconciles a completed task. It first inspects the parent groups: any root group whose tasks are all complete emits {GROUP_SUCCESS}, is removed from the graph, and takes its completed tasks (and their value nodes) with it. Because each task's work is integrated at completion time, removing the group now merely frees newly exposed child groups and downstream streams to become root candidates whose work can be enqueued. Each {GROUP_SUCCESS} event includes the resolved {group}, the {values} map of task results, and any {newGroups}/{newStreams} that surfaced because the {group} left the graph.
+
+TaskSuccess(task, result):
+
+- If {task} is no longer in {graph}, return early.
+- Let {value} and {work} be the respective fields of {result}.
+- Mark {task} complete by inserting a {value} node {v} into {Vvalue} that stores {value} and recording the edge {task → v} in {Etv}.
+- If {work} exists, call {IntegrateWork(work, task)}.
+- For each group {g} with {g → task} (prior to any removal of {task} while iterating):
+ - If all {tasks} targeted by {g} are complete and {g} is a root:
+ - Call {GroupSuccess(g)}.
+
+GroupSuccess(group):
+
+- Let {values} be the list of {value} entries for each {task} targeted by {group}; tasks themselves are **not** included alongside the values, and ordering is intentionally unspecified.
+- Remove {group} from {graph}, along with each child group of {group}, promoting each direct descendant group of {group} to a root group; let {maybeEmptyNewGroups} be the set of such promoted child groups.
+- Let {newGroups} be the result of {PruneEmptyGroups(maybeEmptyNewGroups)}.
+- For each {newGroup} of {newGroups}:
+ - For each {task} targeted by {newGroup}:
+ - Call {StartTask(task)}.
+- Remove all {task} nodes targeted by {group} along with any associated {value} nodes, promoting each child stream of {task} to a root stream; let {newStreams} be the set of such promoted streams.
+- If {values} is non-empty, yield {GROUP_VALUES: { group, values }} on {workEventStream}.
+- Yield {GROUP_SUCCESS: { group, values, newGroups, newStreams }} on {workEventStream}.
+- Call {StartWork(newGroups, newStreams)}.
+
+### Handling Task Failure
+
+TaskFailure handles the error path for a task by emitting {GROUP_FAILURE}. The payload reports the failed group and the propagated {error} so consumers can surface diagnostics. Because a task failure invalidates all dependent subgraphs, this procedure also tears down orphaned tasks, groups, and streams.
+
+TaskFailure(task, error):
+
+- If {task} is no longer in {graph}, return early.
+- For each group {group} with {group → task}:
+ - Yield {GROUP_FAILURE: { group, error }} on {workEventStream}.
+ - Remove {group} from {graph} along with its descendant groups.
+- Remove task nodes not targeted by any other group (tasks that were only targeted by the removed {group} or its removed descendants) and discard any associated {value} or {stream} nodes. Work associated with removed tasks and streams may be cancelled.
+
+### Handling Stream Items
+
+{StreamItems} moves incremental data from a running stream to the consumer. Each {STREAM_ITEMS} event reports the producing {stream}, the yielded batch of item {values}, and any {newGroups}/{newStreams} introduced once each item's {work} is integrated. This event is the only one that carries arbitrary payload data from user-defined streams; all others describe structural progress.
+
+StreamItems(stream, items):
+
+- Let {values}, {newGroups}, and {newStreams} start empty.
+- For each {item} in {items}:
+ - Let {value} and {work} be the respective fields of {item}.
+ - If {work} exists:
+ - Let {maybeEmptyItemNewGroups} and {itemNewStreams} be the result of {IntegrateWork(work)}.
+ - Append the result of {PruneEmptyGroups(maybeEmptyItemNewGroups)} to {newGroups}.
+ - Append {itemNewStreams} to {newStreams}.
+ - Append {value} to {values}.
+- Call {StartWork(newGroups, newStreams)}.
+- Yield {STREAM_VALUES: { stream, values, newGroups, newStreams}} on {workEventStream}, where {newGroups} and {newStreams} denote the nodes that became roots as a result of integrating the batch's {work}.
+- If the {stream} queue is already stopped after draining {items}, also yield {STREAM_SUCCESS} and remove {stream} from {graph}.
+
+### Handling Stream Completion
+
+{StreamSuccess} emits a terminal notification for streams, yielding {STREAM_SUCCESS} with the successful stream.
+
+StreamSuccess(stream):
+
+- Yield {STREAM_SUCCESS: { stream: {stream} }} on {workEventStream}
+- Remove {stream} from {graph}.
+
+### Handling Stream Errors
+
+{StreamFailure} yields {STREAM_FAILURE} with both {stream} and the causal {error}.
+
+StreamFailure(stream, error):
+
+- Yield {STREAM_FAILURE: { stream: {stream}, error: {error} }}.
+- Remove {stream} from {graph}.
diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts
deleted file mode 100644
index 971e08a271..0000000000
--- a/src/execution/IncrementalGraph.ts
+++ /dev/null
@@ -1,320 +0,0 @@
-import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js';
-import { invariant } from '../jsutils/invariant.js';
-import { isPromise } from '../jsutils/isPromise.js';
-
-import type { GraphQLError } from '../error/GraphQLError.js';
-
-import { Queue } from './Queue.js';
-import type {
- DeferredFragmentRecord,
- DeliveryGroup,
- IncrementalDataRecord,
- IncrementalDataRecordResult,
- PendingExecutionGroup,
- StreamItemRecord,
- StreamRecord,
- SuccessfulExecutionGroup,
-} from './types.js';
-import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js';
-
-/**
- * @internal
- */
-export class IncrementalGraph {
- private _rootNodes: Set;
- private _completed: Queue;
- // _push and _stop are assigned in the executor which is executed
- // synchronously by the Queue constructor.
- private _push!: (item: IncrementalDataRecordResult) => void;
- private _stop!: () => void;
-
- constructor() {
- this._rootNodes = new Set();
- this._completed = new Queue((push, stop) => {
- this._push = push;
- this._stop = stop;
- });
- }
-
- getNewRootNodes(
- newDeferredFragmentRecords:
- | ReadonlyArray
- | undefined,
- incrementalDataRecords: ReadonlyArray,
- ): ReadonlyArray {
- const initialResultChildren = new Set();
-
- if (newDeferredFragmentRecords !== undefined) {
- for (const deferredFragmentRecord of newDeferredFragmentRecords) {
- this._addDeferredFragment(
- deferredFragmentRecord,
- initialResultChildren,
- );
- }
- }
-
- this._addIncrementalDataRecords(
- incrementalDataRecords,
- undefined,
- initialResultChildren,
- );
- return this._promoteNonEmptyToRoot(initialResultChildren);
- }
-
- addCompletedSuccessfulExecutionGroup(
- successfulExecutionGroup: SuccessfulExecutionGroup,
- ): void {
- const {
- pendingExecutionGroup,
- newDeferredFragmentRecords,
- incrementalDataRecords,
- } = successfulExecutionGroup;
-
- if (newDeferredFragmentRecords !== undefined) {
- for (const deferredFragmentRecord of newDeferredFragmentRecords) {
- this._addDeferredFragment(deferredFragmentRecord, undefined);
- }
- }
-
- const deferredFragmentRecords =
- pendingExecutionGroup.deferredFragmentRecords;
-
- for (const deferredFragmentRecord of deferredFragmentRecords) {
- const { pendingExecutionGroups, successfulExecutionGroups } =
- deferredFragmentRecord;
- pendingExecutionGroups.delete(pendingExecutionGroup);
- successfulExecutionGroups.add(successfulExecutionGroup);
- }
-
- if (incrementalDataRecords !== undefined) {
- this._addIncrementalDataRecords(
- incrementalDataRecords,
- deferredFragmentRecords,
- );
- }
- }
-
- hasNext(): boolean {
- return this._rootNodes.size > 0;
- }
-
- completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord):
- | {
- newRootNodes: ReadonlyArray;
- successfulExecutionGroups: ReadonlyArray;
- }
- | undefined {
- if (
- !this._rootNodes.has(deferredFragmentRecord) ||
- deferredFragmentRecord.pendingExecutionGroups.size > 0
- ) {
- return;
- }
- const successfulExecutionGroups = Array.from(
- deferredFragmentRecord.successfulExecutionGroups,
- );
- this._rootNodes.delete(deferredFragmentRecord);
- for (const successfulExecutionGroup of successfulExecutionGroups) {
- for (const otherDeferredFragmentRecord of successfulExecutionGroup
- .pendingExecutionGroup.deferredFragmentRecords) {
- otherDeferredFragmentRecord.successfulExecutionGroups.delete(
- successfulExecutionGroup,
- );
- }
- }
- const newRootNodes = this._promoteNonEmptyToRoot(
- deferredFragmentRecord.children,
- );
- this._maybeStop();
- return { newRootNodes, successfulExecutionGroups };
- }
-
- removeDeferredFragment(
- deferredFragmentRecord: DeferredFragmentRecord,
- ): boolean {
- const deleted = this._rootNodes.delete(deferredFragmentRecord);
- if (!deleted) {
- return false;
- }
- this._maybeStop();
- return true;
- }
-
- removeStream(streamRecord: StreamRecord): void {
- this._rootNodes.delete(streamRecord);
- this._maybeStop();
- }
-
- subscribe(
- mapFn: (generator: Generator) => U | undefined,
- ): AsyncGenerator {
- return this._completed.subscribe(mapFn);
- }
-
- private _addIncrementalDataRecords(
- incrementalDataRecords: ReadonlyArray,
- parents: ReadonlyArray | undefined,
- initialResultChildren?: Set,
- ): void {
- for (const incrementalDataRecord of incrementalDataRecords) {
- if (isPendingExecutionGroup(incrementalDataRecord)) {
- for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) {
- deferredFragmentRecord.pendingExecutionGroups.add(
- incrementalDataRecord,
- );
- }
- if (this._completesRootNode(incrementalDataRecord)) {
- this._onExecutionGroup(incrementalDataRecord);
- }
- } else if (parents === undefined) {
- invariant(initialResultChildren !== undefined);
- initialResultChildren.add(incrementalDataRecord);
- } else {
- for (const parent of parents) {
- parent.children.add(incrementalDataRecord);
- }
- }
- }
- }
-
- private _promoteNonEmptyToRoot(
- maybeEmptyNewRootNodes: Set,
- ): ReadonlyArray {
- const newRootNodes: Array = [];
- for (const node of maybeEmptyNewRootNodes) {
- if (isDeferredFragmentRecord(node)) {
- if (node.pendingExecutionGroups.size > 0) {
- for (const pendingExecutionGroup of node.pendingExecutionGroups) {
- if (!this._completesRootNode(pendingExecutionGroup)) {
- this._onExecutionGroup(pendingExecutionGroup);
- }
- }
- this._rootNodes.add(node);
- newRootNodes.push(node);
- continue;
- }
- for (const child of node.children) {
- maybeEmptyNewRootNodes.add(child);
- }
- } else {
- this._rootNodes.add(node);
- newRootNodes.push(node);
-
- // eslint-disable-next-line @typescript-eslint/no-floating-promises
- this._onStreamItems(node);
- }
- }
- return newRootNodes;
- }
-
- private _completesRootNode(
- pendingExecutionGroup: PendingExecutionGroup,
- ): boolean {
- return pendingExecutionGroup.deferredFragmentRecords.some(
- (deferredFragmentRecord) => this._rootNodes.has(deferredFragmentRecord),
- );
- }
-
- private _addDeferredFragment(
- deferredFragmentRecord: DeferredFragmentRecord,
- initialResultChildren: Set | undefined,
- ): void {
- const parent = deferredFragmentRecord.parent;
- if (parent === undefined) {
- invariant(initialResultChildren !== undefined);
- initialResultChildren.add(deferredFragmentRecord);
- return;
- }
- parent.children.add(deferredFragmentRecord);
- }
-
- private _onExecutionGroup(
- pendingExecutionGroup: PendingExecutionGroup,
- ): void {
- let completedExecutionGroup = pendingExecutionGroup.result;
- if (!(completedExecutionGroup instanceof BoxedPromiseOrValue)) {
- completedExecutionGroup = completedExecutionGroup();
- }
- const value = completedExecutionGroup.value;
- if (isPromise(value)) {
- // eslint-disable-next-line @typescript-eslint/no-floating-promises
- value.then((resolved) => this._push(resolved));
- } else {
- this._push(value);
- }
- }
-
- private async _onStreamItems(streamRecord: StreamRecord): Promise {
- let items: Array = [];
- let errors: Array = [];
- let newDeferredFragmentRecords: Array = [];
- let incrementalDataRecords: Array = [];
- const streamItemQueue = streamRecord.streamItemQueue;
- let streamItemRecord: StreamItemRecord | undefined;
- while ((streamItemRecord = streamItemQueue.shift()) !== undefined) {
- let result =
- streamItemRecord instanceof BoxedPromiseOrValue
- ? streamItemRecord.value
- : streamItemRecord().value;
- if (isPromise(result)) {
- if (items.length > 0) {
- this._push({
- streamRecord,
- result:
- // TODO add additional test case or rework for coverage
- errors.length > 0 /* c8 ignore start */
- ? { items, errors } /* c8 ignore stop */
- : { items },
- newDeferredFragmentRecords,
- incrementalDataRecords,
- });
- items = [];
- errors = [];
- newDeferredFragmentRecords = [];
- incrementalDataRecords = [];
- }
- // eslint-disable-next-line no-await-in-loop
- result = await result;
- // wait an additional tick to coalesce resolving additional promises
- // within the queue
- // eslint-disable-next-line no-await-in-loop
- await Promise.resolve();
- }
- if (result.item === undefined) {
- if (items.length > 0) {
- this._push({
- streamRecord,
- result: errors.length > 0 ? { items, errors } : { items },
- newDeferredFragmentRecords,
- incrementalDataRecords,
- });
- }
- this._push(
- result.errors === undefined
- ? { streamRecord }
- : {
- streamRecord,
- errors: result.errors,
- },
- );
- return;
- }
- items.push(result.item);
- if (result.errors !== undefined) {
- errors.push(...result.errors);
- }
- if (result.newDeferredFragmentRecords !== undefined) {
- newDeferredFragmentRecords.push(...result.newDeferredFragmentRecords);
- }
- if (result.incrementalDataRecords !== undefined) {
- incrementalDataRecords.push(...result.incrementalDataRecords);
- }
- }
- }
-
- private _maybeStop(): void {
- if (!this.hasNext()) {
- this._stop();
- }
- }
-}
diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts
index ab07f9ac76..42bf8f2edf 100644
--- a/src/execution/IncrementalPublisher.ts
+++ b/src/execution/IncrementalPublisher.ts
@@ -4,33 +4,29 @@ import { pathToArray } from '../jsutils/Path.js';
import type { GraphQLError } from '../error/GraphQLError.js';
import type { AbortSignalListener } from './AbortSignalListener.js';
-import { IncrementalGraph } from './IncrementalGraph.js';
+import { mapAsyncIterable } from './mapAsyncIterable.js';
import type {
- CompletedExecutionGroup,
CompletedResult,
DeferredFragmentRecord,
- DeliveryGroup,
+ ExecutionGroupValue,
ExperimentalIncrementalExecutionResults,
- IncrementalDataRecord,
- IncrementalDataRecordResult,
IncrementalDeferResult,
IncrementalResult,
- IncrementalStreamResult,
+ IncrementalWork,
InitialIncrementalExecutionResult,
PendingResult,
- StreamItemsResult,
+ StreamItemValue,
StreamRecord,
SubsequentIncrementalExecutionResult,
} from './types.js';
-import { isCompletedExecutionGroup, isFailedExecutionGroup } from './types.js';
import { withCleanup } from './withCleanup.js';
+import type { WorkQueueEvent } from './WorkQueue.js';
+import { createWorkQueue } from './WorkQueue.js';
-// eslint-disable-next-line max-params
export function buildIncrementalResponse(
result: ObjMap,
errors: ReadonlyArray,
- newDeferredFragmentRecords: ReadonlyArray | undefined,
- incrementalDataRecords: ReadonlyArray,
+ work: IncrementalWork,
earlyReturns: Map Promise>,
abortSignalListener: AbortSignalListener | undefined,
): ExperimentalIncrementalExecutionResults {
@@ -38,63 +34,55 @@ export function buildIncrementalResponse(
earlyReturns,
abortSignalListener,
);
- return incrementalPublisher.buildResponse(
- result,
- errors,
- newDeferredFragmentRecords,
- incrementalDataRecords,
- );
+ return incrementalPublisher.buildResponse(result, errors, work);
}
interface SubsequentIncrementalExecutionResultContext {
pending: Array;
incremental: Array;
completed: Array;
+ hasNext: boolean;
}
/**
- * This class is used to publish incremental results to the client, enabling semi-concurrent
- * execution while preserving result order.
- *
* @internal
*/
-class IncrementalPublisher {
+export class IncrementalPublisher {
+ private _ids: Map;
private _earlyReturns: Map Promise>;
private _abortSignalListener: AbortSignalListener | undefined;
private _nextId: number;
- private _incrementalGraph: IncrementalGraph;
constructor(
earlyReturns: Map Promise>,
abortSignalListener: AbortSignalListener | undefined,
) {
+ this._ids = new Map();
this._earlyReturns = earlyReturns;
this._abortSignalListener = abortSignalListener;
this._nextId = 0;
- this._incrementalGraph = new IncrementalGraph();
}
buildResponse(
data: ObjMap,
errors: ReadonlyArray,
- newDeferredFragmentRecords:
- | ReadonlyArray
- | undefined,
- incrementalDataRecords: ReadonlyArray,
+ work: IncrementalWork,
): ExperimentalIncrementalExecutionResults {
- const newRootNodes = this._incrementalGraph.getNewRootNodes(
- newDeferredFragmentRecords,
- incrementalDataRecords,
- );
+ const { initialGroups, initialStreams, events } = createWorkQueue<
+ ExecutionGroupValue,
+ StreamItemValue,
+ DeferredFragmentRecord,
+ StreamRecord
+ >(work);
- const pending = this._toPendingResults(newRootNodes);
+ const pending = this._toPendingResults(initialGroups, initialStreams);
const initialResult: InitialIncrementalExecutionResult = errors.length
? { errors, data, pending, hasNext: true }
: { data, pending, hasNext: true };
- const subsequentResults = this._incrementalGraph.subscribe((batch) =>
- this._handleCompletedBatch(batch),
+ const subsequentResults = mapAsyncIterable(events, (batch) =>
+ this._handleBatch(batch),
);
return {
@@ -106,169 +94,181 @@ class IncrementalPublisher {
};
}
- private _ensureId(deliveryGroup: DeliveryGroup): string {
- return (deliveryGroup.id ??= String(this._nextId++));
+ private _ensureId(
+ deferredFragmentOrStream: DeferredFragmentRecord | StreamRecord,
+ ): string {
+ let id = this._ids.get(deferredFragmentOrStream);
+ if (id !== undefined) {
+ return id;
+ }
+ id = String(this._nextId++);
+ this._ids.set(deferredFragmentOrStream, id);
+ return id;
}
private _toPendingResults(
- newRootNodes: ReadonlyArray,
+ newGroups: ReadonlyArray,
+ newStreams: ReadonlyArray,
): Array {
const pendingResults: Array = [];
- for (const node of newRootNodes) {
- const id = this._ensureId(node);
- const pendingResult: PendingResult = {
- id,
- path: pathToArray(node.path),
- };
- if (node.label !== undefined) {
- pendingResult.label = node.label;
+ for (const collection of [newGroups, newStreams]) {
+ for (const node of collection) {
+ const id = this._ensureId(node);
+ const pendingResult: PendingResult = {
+ id,
+ path: pathToArray(node.path),
+ };
+ if (node.label !== undefined) {
+ pendingResult.label = node.label;
+ }
+ pendingResults.push(pendingResult);
}
- pendingResults.push(pendingResult);
}
return pendingResults;
}
- private _handleCompletedBatch(
- batch: Iterable,
- ): SubsequentIncrementalExecutionResult | undefined {
+ private _handleBatch(
+ batch: ReadonlyArray<
+ WorkQueueEvent<
+ ExecutionGroupValue,
+ StreamItemValue,
+ DeferredFragmentRecord,
+ StreamRecord
+ >
+ >,
+ ): SubsequentIncrementalExecutionResult {
const context: SubsequentIncrementalExecutionResultContext = {
pending: [],
incremental: [],
completed: [],
+ hasNext: true,
};
- for (const completedResult of batch) {
- this._handleCompletedIncrementalData(completedResult, context);
- }
-
- const { incremental, completed } = context;
- if (incremental.length === 0 && completed.length === 0) {
- return;
+ for (const event of batch) {
+ this._handleWorkQueueEvent(event, context);
}
- const hasNext = this._incrementalGraph.hasNext();
+ const { incremental, completed, pending, hasNext } = context;
- const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
- { hasNext };
- const pending = context.pending;
+ const result: SubsequentIncrementalExecutionResult = { hasNext };
if (pending.length > 0) {
- subsequentIncrementalExecutionResult.pending = pending;
+ result.pending = pending;
}
if (incremental.length > 0) {
- subsequentIncrementalExecutionResult.incremental = incremental;
+ result.incremental = incremental;
}
if (completed.length > 0) {
- subsequentIncrementalExecutionResult.completed = completed;
+ result.completed = completed;
}
- return subsequentIncrementalExecutionResult;
+ return result;
}
- private _handleCompletedIncrementalData(
- completedIncrementalData: IncrementalDataRecordResult,
+ private _handleWorkQueueEvent(
+ event: WorkQueueEvent<
+ ExecutionGroupValue,
+ StreamItemValue,
+ DeferredFragmentRecord,
+ StreamRecord
+ >,
context: SubsequentIncrementalExecutionResultContext,
): void {
- if (isCompletedExecutionGroup(completedIncrementalData)) {
- this._handleCompletedExecutionGroup(completedIncrementalData, context);
- } else {
- this._handleCompletedStreamItems(completedIncrementalData, context);
- }
- }
-
- private _handleCompletedExecutionGroup(
- completedExecutionGroup: CompletedExecutionGroup,
- context: SubsequentIncrementalExecutionResultContext,
- ): void {
- if (isFailedExecutionGroup(completedExecutionGroup)) {
- for (const deferredFragmentRecord of completedExecutionGroup
- .pendingExecutionGroup.deferredFragmentRecords) {
- if (
- this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord)
- ) {
- const id = this._ensureId(deferredFragmentRecord);
- context.completed.push({
+ switch (event.kind) {
+ case 'GROUP_VALUES': {
+ const group = event.group;
+ const id = this._ensureId(group);
+ for (const value of event.values) {
+ const { bestId, subPath } = this._getBestIdAndSubPath(
id,
- errors: completedExecutionGroup.errors,
- });
+ group,
+ value,
+ );
+ const incrementalEntry: IncrementalDeferResult = {
+ id: bestId,
+ data: value.data,
+ };
+ if (value.errors !== undefined) {
+ incrementalEntry.errors = value.errors;
+ }
+ if (subPath !== undefined) {
+ incrementalEntry.subPath = subPath;
+ }
+ context.incremental.push(incrementalEntry);
}
+ break;
}
- return;
- }
-
- this._incrementalGraph.addCompletedSuccessfulExecutionGroup(
- completedExecutionGroup,
- );
-
- for (const deferredFragmentRecord of completedExecutionGroup
- .pendingExecutionGroup.deferredFragmentRecords) {
- const completion = this._incrementalGraph.completeDeferredFragment(
- deferredFragmentRecord,
- );
- if (completion === undefined) {
- continue;
+ case 'GROUP_SUCCESS': {
+ const group = event.group;
+ const id = this._ensureId(group);
+ context.completed.push({ id });
+ this._ids.delete(group);
+ if (event.newGroups.length > 0 || event.newStreams.length > 0) {
+ context.pending.push(
+ ...this._toPendingResults(event.newGroups, event.newStreams),
+ );
+ }
+ break;
}
- const id = this._ensureId(deferredFragmentRecord);
- const incremental = context.incremental;
- const { newRootNodes, successfulExecutionGroups } = completion;
- context.pending.push(...this._toPendingResults(newRootNodes));
- for (const successfulExecutionGroup of successfulExecutionGroups) {
- const { bestId, subPath } = this._getBestIdAndSubPath(
+ case 'GROUP_FAILURE': {
+ const { group, error } = event;
+ const id = this._ensureId(group);
+ context.completed.push({
id,
- deferredFragmentRecord,
- successfulExecutionGroup,
+ errors: [error as GraphQLError],
+ });
+ this._ids.delete(group);
+ break;
+ }
+ case 'STREAM_VALUES': {
+ const stream = event.stream;
+ const id = this._ensureId(stream);
+ const { values, newGroups, newStreams } = event;
+ const items: Array = [];
+ const errors: Array = [];
+ for (const value of values) {
+ items.push(value.item);
+ if (value.errors !== undefined) {
+ errors.push(...value.errors);
+ }
+ }
+ context.incremental.push(
+ errors.length > 0 ? { id, items, errors } : { id, items },
);
- const incrementalEntry: IncrementalDeferResult = {
- ...successfulExecutionGroup.result,
- id: bestId,
- };
- if (subPath !== undefined) {
- incrementalEntry.subPath = subPath;
+ if (newGroups.length > 0 || newStreams.length > 0) {
+ context.pending.push(
+ ...this._toPendingResults(newGroups, newStreams),
+ );
}
- incremental.push(incrementalEntry);
+ break;
}
- context.completed.push({ id });
- }
- }
-
- private _handleCompletedStreamItems(
- streamItemsResult: StreamItemsResult,
- context: SubsequentIncrementalExecutionResultContext,
- ): void {
- const streamRecord = streamItemsResult.streamRecord;
- const id = this._ensureId(streamRecord);
- if (streamItemsResult.errors !== undefined) {
- context.completed.push({
- id,
- errors: streamItemsResult.errors,
- });
- this._incrementalGraph.removeStream(streamRecord);
- const earlyReturn = this._earlyReturns.get(streamRecord);
- if (earlyReturn !== undefined) {
- earlyReturn().catch(() => {
- /* c8 ignore next 1 */
- // ignore error
+ case 'STREAM_SUCCESS': {
+ const stream = event.stream;
+ context.completed.push({
+ id: this._ensureId(stream),
});
- this._earlyReturns.delete(streamRecord);
+ this._ids.delete(stream);
+ this._earlyReturns.delete(stream);
+ break;
}
- } else if (streamItemsResult.result === undefined) {
- context.completed.push({ id });
- this._incrementalGraph.removeStream(streamRecord);
- this._earlyReturns.delete(streamRecord);
- } else {
- const incrementalEntry: IncrementalStreamResult = {
- id,
- ...streamItemsResult.result,
- };
-
- context.incremental.push(incrementalEntry);
-
- const { newDeferredFragmentRecords, incrementalDataRecords } =
- streamItemsResult;
- if (incrementalDataRecords !== undefined) {
- const newRootNodes = this._incrementalGraph.getNewRootNodes(
- newDeferredFragmentRecords,
- incrementalDataRecords,
- );
- context.pending.push(...this._toPendingResults(newRootNodes));
+ case 'STREAM_FAILURE': {
+ const stream = event.stream;
+ context.completed.push({
+ id: this._ensureId(stream),
+ errors: [event.error as GraphQLError],
+ });
+ this._ids.delete(stream);
+ const earlyReturn = this._earlyReturns.get(stream);
+ if (earlyReturn !== undefined) {
+ earlyReturn().catch(() => {
+ /* c8 ignore next 1 */
+ // ignore error
+ });
+ this._earlyReturns.delete(stream);
+ }
+ break;
+ }
+ case 'WORK_QUEUE_TERMINATION': {
+ context.hasNext = false;
+ break;
}
}
}
@@ -276,17 +276,16 @@ class IncrementalPublisher {
private _getBestIdAndSubPath(
initialId: string,
initialDeferredFragmentRecord: DeferredFragmentRecord,
- completedExecutionGroup: CompletedExecutionGroup,
+ executionGroupValue: ExecutionGroupValue,
): { bestId: string; subPath: ReadonlyArray | undefined } {
let maxLength = pathToArray(initialDeferredFragmentRecord.path).length;
let bestId = initialId;
- for (const deferredFragmentRecord of completedExecutionGroup
- .pendingExecutionGroup.deferredFragmentRecords) {
+ for (const deferredFragmentRecord of executionGroupValue.deferredFragmentRecords) {
if (deferredFragmentRecord === initialDeferredFragmentRecord) {
continue;
}
- const id = deferredFragmentRecord.id;
+ const id = this._ids.get(deferredFragmentRecord);
// TODO: add test case for when an fragment has not been released, but might be processed for the shortest path.
/* c8 ignore next 3 */
if (id === undefined) {
@@ -299,7 +298,7 @@ class IncrementalPublisher {
bestId = id;
}
}
- const subPath = completedExecutionGroup.path.slice(maxLength);
+ const subPath = executionGroupValue.path.slice(maxLength);
return {
bestId,
subPath: subPath.length > 0 ? subPath : undefined,
diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts
index bd9a9a076d..ddab302b53 100644
--- a/src/execution/Queue.ts
+++ b/src/execution/Queue.ts
@@ -1,95 +1,345 @@
+import { invariant } from '../jsutils/invariant.js';
import { isPromise } from '../jsutils/isPromise.js';
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
import { withCleanup } from './withCleanup.js';
+type Settled =
+ | { status: 'fulfilled'; value: T }
+ | { status: 'rejected'; reason: unknown };
+
+interface ItemEntry {
+ kind: 'item';
+ settled?: Settled;
+}
+
+interface StopEntry {
+ kind: 'stop';
+}
+
+type Entry = ItemEntry | StopEntry;
+
+interface BatchRequest {
+ resolve: (generator: Generator | undefined) => void;
+ reject: (reason: unknown) => void;
+}
+
+interface QueueExecutorOptions {
+ push: (item: PromiseOrValue) => PromiseOrValue;
+ stop: (reason?: unknown) => void;
+ started: Promise;
+ stopped: Promise;
+}
+
/**
+ * A Queue is a lightweight async-generator primitive inspired by Brian Kim's
+ * Repeater (https://repeater.js.org, https://github.com/repeaterjs/repeater).
+ * The ergonomics are similar, but this implementation favors clarity over
+ * performance and gives producers flexibility to remain lazy, become eager, or
+ * live somewhere in between.
+ *
+ * The constructor takes an executor function and an optional `initialCapacity`.
+ * Executors receive `{ push, stop, started, stopped }` and may return `void` or
+ * a promise if they perform asynchronous setup. They call `push` whenever
+ * another item is ready, call `stop` when no more values will be produced
+ * (optionally supplying an error), await `started` when setup should run only
+ * after iteration begins, and await `stopped` to observe when the queue
+ * terminates. Because `push` and `stop` are plain functions, executors can
+ * hoist them into outside scopes or pass them to helpers. If the executor
+ * throws or its returned promise rejects, the queue treats it as `stop(error)`
+ * and propagates the failure.
+ *
+ * The `initialCapacity` argument (default `1`) governs backpressure. Capacity
+ * is the maximum number of buffered items allowed before a push must wait.
+ * When the backlog reaches capacity, `push` returns a promise that settles
+ * once consumption releases space; otherwise it returns `undefined`. Setting
+ * capacity to `1` yields a fully lazy queue (every push waits unless a prior
+ * item has been consumed); higher capacities buffer that many items eagerly.
+ * Capacity can be changed later via `setCapacity` and observed via
+ * `getCapacity`.
+ *
+ * `subscribe(reducer)` returns an async generator whose batches feed a generator
+ * of settled values into the reducer; whatever the reducer returns (other than
+ * `undefined`) becomes the yielded value for that batch. Calling `return()` on
+ * the subscription settles pending `next` calls thanks to `withCleanup`,
+ * providing direct abort semantics rather than leaving `next()` suspended.
+ *
+ * 'forEachBatch(reducer)` is a convenience method that subscribes with the
+ * given reducer and runs it for each batch until the queue stops.
+ *
+ * Producers can stay lazy by awaiting `started`, using zero capacity, and
+ * awaiting each `push`. Skipping those waits while raising capacity makes the
+ * queue eager up to its configured limit. The `isStopped()` helper exposes
+ * whether the queue has fully stopped, which can be useful when the reducer
+ * function actually performs external work and wants to bail early without
+ * awaiting another `next`.
+ *
* @internal
*/
export class Queue {
- private _items: Array;
- private _stopped: boolean;
- private _resolvers: Array<(iterable: Generator | undefined) => void>;
+ private _capacity: number;
+ private _backlog = 0;
+ private _waiters: Array<() => void> = [];
+ private _entries: Array> = [];
+ private _isStopped = false;
+ private _stopRequested = false;
+ private _batchRequests = new Set>();
+
+ private _resolveStarted: () => void;
+ private _resolveStopped: () => void;
constructor(
- executor: (
- push: (item: T) => void,
- stop: () => void,
- ) => PromiseOrValue,
+ executor: ({
+ push,
+ stop,
+ started,
+ stopped,
+ }: QueueExecutorOptions) => PromiseOrValue,
+ initialCapacity = 1,
) {
- this._items = [];
- this._stopped = false;
- this._resolvers = [];
- let result;
+ this._capacity = this._normalizeCapacity(initialCapacity);
+
+ const { promise: started, resolve: resolveStarted } =
+ // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
+ promiseWithResolvers();
+
+ this._resolveStarted = resolveStarted;
+ const { promise: stopped, resolve: resolveStopped } =
+ // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
+ promiseWithResolvers();
+ this._resolveStopped = resolveStopped;
+
try {
- result = executor(this._push.bind(this), this.stop.bind(this));
- } catch {
- // ignore sync executor errors
- }
- if (isPromise(result)) {
- result.catch(() => {
- /* ignore async executor errors */
+ const result = executor({
+ push: this._push.bind(this),
+ stop: this._stop.bind(this),
+ started,
+ stopped,
});
+ if (isPromise(result)) {
+ result.catch((error: unknown) => this._stop(error));
+ }
+ } catch (error) {
+ this._stop(error);
}
}
- stop(): void {
- this._stopped = true;
- this._resolve(undefined);
- }
-
subscribe(
- mapFn: (generator: Generator) => U | undefined,
+ reducer: (
+ generator: Generator,
+ ) => PromiseOrValue | undefined = (generator) =>
+ Array.from(generator) as U,
): AsyncGenerator {
- return withCleanup(this.subscribeImpl(mapFn), () => this.stop());
+ const iterator = this._iteratorLoop(reducer);
+ return withCleanup(iterator, () => {
+ for (const entry of this._entries) {
+ if (entry.kind === 'item') {
+ this._release();
+ }
+ }
+ this._entries.length = 0;
+ this._batchRequests.forEach((request) => request.resolve(undefined));
+ this._batchRequests.clear();
+ this._stop();
+ });
+ }
+
+ async forEachBatch(
+ reducer: (generator: Generator) => PromiseOrValue,
+ ): Promise {
+ const sub = this.subscribe(async (generator) => {
+ const { promise: drained, resolve } =
+ // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
+ promiseWithResolvers();
+
+ const wrappedBatch = (function* wrapper(): Generator {
+ yield* generator;
+ resolve();
+ })();
+
+ await Promise.all([reducer(wrappedBatch), drained]);
+ });
+
+ for await (const _ of sub /* c8 ignore start */) {
+ // intentionally empty
+ } /* c8 ignore stop */
+ }
+
+ setCapacity(nextCapacity: number): void {
+ this._capacity = this._normalizeCapacity(nextCapacity);
+ this._flush();
+ }
+
+ getCapacity(): number {
+ return this._capacity;
+ }
+
+ isStopped(): boolean {
+ return this._isStopped;
}
- private async *subscribeImpl(
- mapFn: (generator: Generator) => U | undefined,
+ private _normalizeCapacity(capacity: number): number {
+ return Math.max(1, Math.floor(capacity));
+ }
+
+ private _flush(): void {
+ while (this._waiters.length > 0 && this._backlog < this._capacity) {
+ this._waiters.shift()?.();
+ }
+ }
+
+ private _reserve(): PromiseOrValue {
+ this._backlog += 1;
+ if (this._backlog < this._capacity) {
+ return undefined;
+ }
+ // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
+ const { promise, resolve } = promiseWithResolvers();
+ this._waiters.push(resolve);
+ return promise;
+ }
+
+ private _release(): void {
+ if (this._backlog > 0) {
+ this._backlog -= 1;
+ }
+ this._flush();
+ }
+
+ private async *_iteratorLoop(
+ reducer: (
+ generator: Generator,
+ ) => PromiseOrValue | undefined,
): AsyncGenerator {
+ this._resolveStarted();
let nextBatch: Generator | undefined;
// eslint-disable-next-line no-await-in-loop
- while ((nextBatch = await this._nextBatch()) !== undefined) {
- const mapped = mapFn(nextBatch);
- if (mapped !== undefined) {
- yield mapped;
+ while ((nextBatch = await this._waitForNextBatch())) {
+ let reduced = reducer(nextBatch);
+ if (isPromise(reduced)) {
+ // eslint-disable-next-line no-await-in-loop
+ reduced = await reduced;
}
+ if (reduced === undefined) {
+ continue;
+ }
+ yield reduced;
}
}
- private _nextBatch(): Promise | undefined> {
- if (this._items.length) {
- return Promise.resolve(this.batch());
- }
- if (this._stopped) {
- return Promise.resolve(undefined);
- }
- const { promise, resolve } = promiseWithResolvers<
+ private _waitForNextBatch(): Promise | undefined> {
+ const { promise, resolve, reject } = promiseWithResolvers<
Generator | undefined
>();
- this._resolvers.push(resolve);
+ this._batchRequests.add({ resolve, reject });
+ this._deliverBatchIfReady();
return promise;
}
- private _push(item: T): void {
- if (!this._stopped) {
- this._items.push(item);
- this._resolve(this.batch());
+ private _push(item: PromiseOrValue): PromiseOrValue {
+ if (this._stopRequested) {
+ return undefined;
}
+ const maybePushPromise = this._reserve();
+ if (isPromise(item)) {
+ const entry: ItemEntry = { kind: 'item' };
+ this._entries.push(entry);
+ item.then(
+ (resolved) => {
+ entry.settled = { status: 'fulfilled', value: resolved };
+ this._deliverBatchIfReady();
+ },
+ (reason: unknown) => {
+ entry.settled = { status: 'rejected', reason };
+ this._deliverBatchIfReady();
+ },
+ );
+ } else {
+ this._entries.push({
+ kind: 'item',
+ settled: { status: 'fulfilled', value: item },
+ });
+ this._deliverBatchIfReady();
+ }
+ return maybePushPromise;
}
- private _resolve(maybeIterable: Generator | undefined): void {
- for (const resolve of this._resolvers) {
- resolve(maybeIterable);
+ private _stop(reason?: unknown): void {
+ if (this._stopRequested) {
+ return;
+ }
+ this._stopRequested = true;
+ if (reason === undefined) {
+ if (this._entries.length === 0) {
+ this._isStopped = true;
+ this._resolveStopped();
+ this._deliverBatchIfReady();
+ return;
+ }
+
+ this._entries.push({ kind: 'stop' });
+ this._deliverBatchIfReady();
+ return;
}
- this._resolvers = [];
+
+ this._entries.push({
+ kind: 'item',
+ settled: { status: 'rejected', reason },
+ });
+ this._entries.push({ kind: 'stop' });
+ this._deliverBatchIfReady();
}
- private *batch(): Generator {
- let item: T | undefined;
- while ((item = this._items.shift()) !== undefined) {
- yield item;
+ private _deliverBatchIfReady(): void {
+ if (!this._batchRequests.size) {
+ return;
+ }
+ const headEntry = this._entries[0];
+ const requests = this._batchRequests;
+ if (headEntry !== undefined) {
+ // stop sentinel always follows other work
+ invariant(headEntry.kind !== 'stop');
+
+ const settled = headEntry.settled;
+ if (settled !== undefined) {
+ if (settled.status === 'fulfilled') {
+ this._batchRequests = new Set();
+ requests.forEach((request) => request.resolve(this._drainBatch()));
+ return;
+ }
+ this._entries.shift();
+ this._release();
+ this._isStopped = true;
+ this._resolveStopped();
+ this._batchRequests = new Set();
+ requests.forEach((request) => request.reject(settled.reason));
+ }
+ } else if (this._isStopped) {
+ this._batchRequests = new Set();
+ requests.forEach((request) => request.resolve(undefined));
+ }
+ }
+
+ private *_drainBatch(): Generator {
+ while (true) {
+ const entry = this._entries[0];
+ if (entry === undefined) {
+ return;
+ }
+ if (entry.kind === 'stop') {
+ this._isStopped = true;
+ this._entries.shift();
+ this._resolveStopped();
+ return;
+ }
+ const settled = entry.settled;
+ if (settled === undefined || settled.status === 'rejected') {
+ return;
+ }
+ this._entries.shift();
+ this._release();
+ yield settled.value;
}
}
}
diff --git a/src/execution/WorkQueue.ts b/src/execution/WorkQueue.ts
new file mode 100644
index 0000000000..7907f0f107
--- /dev/null
+++ b/src/execution/WorkQueue.ts
@@ -0,0 +1,664 @@
+import { isPromise } from '../jsutils/isPromise.js';
+import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
+
+import { Queue } from './Queue.js';
+
+export interface Group> {
+ parent?: TSelf | undefined;
+}
+
+interface WorkResult<
+ TValue,
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ value: TValue;
+ work?: Work | undefined;
+}
+
+export interface Stream<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ queue: Queue>;
+}
+
+export interface Work, S extends Stream> {
+ groups?: ReadonlyArray;
+ tasks?: ReadonlyArray>;
+ streams?: ReadonlyArray;
+}
+
+interface NewWork, S extends Stream> {
+ newGroups: ReadonlyArray;
+ newStreams: ReadonlyArray;
+}
+
+export interface WorkQueue<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ initialGroups: ReadonlyArray;
+ initialStreams: ReadonlyArray;
+ events: AsyncGenerator>>;
+}
+
+export type StreamItem<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> = WorkResult;
+
+export type TaskResult<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> = WorkResult;
+
+type MaybePromise =
+ | { status: 'fulfilled'; value: T }
+ | { status: 'pending'; promise: Promise }
+ | { status: 'rejected'; reason: unknown };
+
+/** @internal **/
+export class Task, S extends Stream> {
+ readonly groups: ReadonlyArray;
+ private _fn: () => PromiseOrValue>;
+ private _maybePromise?: MaybePromise>;
+ constructor(
+ fn: () => PromiseOrValue>,
+ groups: ReadonlyArray,
+ ) {
+ this._fn = fn;
+ this.groups = groups;
+ }
+ start(): MaybePromise> {
+ if (this._maybePromise) {
+ return this._maybePromise;
+ }
+ try {
+ const result = this._fn();
+ if (isPromise(result)) {
+ this._maybePromise = { status: 'pending', promise: result };
+ result.then(
+ (value) => {
+ this._maybePromise = { status: 'fulfilled', value };
+ },
+ (reason: unknown) => {
+ this._maybePromise = { status: 'rejected', reason };
+ },
+ );
+ } else {
+ this._maybePromise = { status: 'fulfilled', value: result };
+ }
+ } catch (reason: unknown) {
+ this._maybePromise = { status: 'rejected', reason };
+ }
+ return this._maybePromise;
+ }
+ result(): PromiseOrValue> {
+ const maybePromise = this.start();
+ switch (maybePromise.status) {
+ case 'fulfilled':
+ return maybePromise.value;
+ case 'rejected':
+ throw maybePromise.reason;
+ case 'pending': {
+ return maybePromise.promise;
+ }
+ }
+ }
+}
+
+interface TaskSuccessGraphEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ kind: 'TASK_SUCCESS';
+ task: Task;
+ result: TaskResult;
+}
+
+interface TaskFailureGraphEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ kind: 'TASK_FAILURE';
+ task: Task;
+ error: unknown;
+}
+
+interface StreamItemsEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ kind: 'STREAM_ITEMS';
+ stream: S;
+ items: Generator>;
+}
+
+interface StreamSuccessEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ kind: 'STREAM_SUCCESS';
+ stream: S;
+}
+
+interface StreamFailureEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ kind: 'STREAM_FAILURE';
+ stream: S;
+ error: unknown;
+}
+
+type GraphEvent, S extends Stream> =
+ | TaskSuccessGraphEvent
+ | TaskFailureGraphEvent
+ | StreamItemsEvent
+ | StreamSuccessEvent
+ | StreamFailureEvent;
+
+interface GroupValuesEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> {
+ kind: 'GROUP_VALUES';
+ group: G;
+ values: ReadonlyArray;
+}
+
+interface GroupSuccessEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> extends NewWork {
+ kind: 'GROUP_SUCCESS';
+ group: G;
+}
+
+interface GroupFailureEvent> {
+ kind: 'GROUP_FAILURE';
+ group: G;
+ error: unknown;
+}
+
+interface StreamValuesEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> extends NewWork {
+ kind: 'STREAM_VALUES';
+ stream: S;
+ values: ReadonlyArray;
+}
+
+interface WorkQueueTerminationEvent {
+ kind: 'WORK_QUEUE_TERMINATION';
+}
+
+export type WorkQueueEvent<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+> =
+ | GroupValuesEvent
+ | GroupSuccessEvent
+ | GroupFailureEvent
+ | StreamValuesEvent
+ | StreamSuccessEvent
+ | StreamFailureEvent
+ | WorkQueueTerminationEvent;
+
+interface GroupNode, S extends Stream> {
+ childGroups: Array;
+ tasks: Set>;
+ pending: number;
+}
+
+interface TaskNode, S extends Stream> {
+ value: T | undefined;
+ childStreams: Array;
+}
+
+/** @internal */
+export function createWorkQueue<
+ T,
+ I,
+ G extends Group,
+ S extends Stream,
+>(initialWork: Work | undefined): WorkQueue {
+ const rootGroups = new Set();
+ const rootStreams = new Set();
+ const groupNodes = new Map>();
+ const taskNodes = new Map, TaskNode>();
+ let pushGraphEvent!: (e: GraphEvent) => PromiseOrValue;
+ let stopGraphEvents!: (err?: unknown) => void;
+
+ const { newGroups: initialRootGroups, newStreams: initialRootStreams } =
+ maybeIntegrateWork(initialWork);
+ const nonEmptyInitialRootGroups = pruneEmptyGroups(initialRootGroups);
+
+ const events = new Queue>(
+ ({ push: _push, stop: _stop, started: _started }) => {
+ pushGraphEvent = _push;
+ stopGraphEvents = _stop;
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ _started.then(() =>
+ startWork(nonEmptyInitialRootGroups, initialRootStreams),
+ );
+ },
+ 1,
+ ).subscribe((graphEvents) => handleGraphEvents(graphEvents));
+
+ return {
+ initialGroups: nonEmptyInitialRootGroups,
+ initialStreams: initialRootStreams,
+ events,
+ };
+
+ function maybeIntegrateWork(
+ work: Work | undefined,
+ parentTask?: Task,
+ ): NewWork {
+ if (!work) {
+ return { newGroups: [], newStreams: [] };
+ }
+ const { groups, tasks, streams } = work;
+ const newGroups = groups ? addGroups(groups, parentTask) : [];
+ if (tasks) {
+ for (const task of tasks) {
+ addTask(task);
+ }
+ }
+ const newStreams = streams ? addStreams(streams, parentTask) : [];
+ return { newGroups, newStreams };
+ }
+
+ function addGroups(
+ originalGroups: ReadonlyArray,
+ parentTask?: Task,
+ ): Array {
+ const groupSet = new Set(originalGroups);
+ const visited = new Set();
+ const newRootGroups: Array = [];
+ for (const group of originalGroups) {
+ addGroup(group, groupSet, newRootGroups, visited, parentTask);
+ }
+ return newRootGroups;
+ }
+
+ function addGroup(
+ group: G,
+ groupSet: ReadonlySet,
+ newRootGroups: Array,
+ visited: Set,
+ parentTask?: Task,
+ ): void {
+ if (visited.has(group)) {
+ return;
+ }
+ visited.add(group);
+ const parent = group.parent;
+ if (parent !== undefined && groupSet.has(parent)) {
+ addGroup(parent, groupSet, newRootGroups, visited, parentTask);
+ }
+
+ const groupNode: GroupNode = {
+ childGroups: [],
+ tasks: new Set(),
+ pending: 0,
+ };
+ groupNodes.set(group, groupNode);
+
+ if (parentTask === undefined && !parent) {
+ newRootGroups.push(group);
+ } else if (parent) {
+ groupNodes.get(parent)?.childGroups.push(group);
+ }
+ }
+
+ function addTask(task: Task): void {
+ for (const group of task.groups) {
+ const groupNode = groupNodes.get(group);
+ if (groupNode) {
+ groupNode.tasks.add(task);
+ groupNode.pending++;
+ if (rootGroups.has(group)) {
+ startTask(task);
+ }
+ }
+ }
+ }
+
+ function addStreams(
+ streams: ReadonlyArray,
+ parentTask?: Task,
+ ): ReadonlyArray {
+ if (!parentTask) {
+ return streams;
+ }
+ const taskNode = taskNodes.get(parentTask);
+ if (taskNode) {
+ taskNode.childStreams.push(...streams);
+ }
+ return [];
+ }
+
+ function pruneEmptyGroups(
+ newGroups: ReadonlyArray,
+ nonEmptyNewGroups: Array = [],
+ ): ReadonlyArray {
+ for (const newGroup of newGroups) {
+ const newGroupState = groupNodes.get(newGroup);
+ if (newGroupState) {
+ if (newGroupState.pending === 0) {
+ groupNodes.delete(newGroup);
+ pruneEmptyGroups(newGroupState.childGroups, nonEmptyNewGroups);
+ } else {
+ nonEmptyNewGroups.push(newGroup);
+ }
+ }
+ }
+ return nonEmptyNewGroups;
+ }
+
+ function startWork(
+ newGroups: ReadonlyArray,
+ newStreams: ReadonlyArray,
+ ): void {
+ for (const group of newGroups) {
+ startGroup(group);
+ }
+ for (const stream of newStreams) {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ startStream(stream);
+ }
+ }
+
+ function startGroup(group: G): void {
+ rootGroups.add(group);
+ const groupNode = groupNodes.get(group);
+ if (groupNode) {
+ for (const task of groupNode.tasks) {
+ startTask(task);
+ }
+ }
+ }
+
+ function startTask(task: Task): void {
+ if (taskNodes.has(task)) {
+ return;
+ }
+ taskNodes.set(task, {
+ value: undefined,
+ childStreams: [],
+ });
+ try {
+ const result = task.result();
+ if (isPromise(result)) {
+ result.then(
+ (resolved) => {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ pushGraphEvent({ kind: 'TASK_SUCCESS', task, result: resolved });
+ },
+ (error: unknown) => {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ pushGraphEvent({ kind: 'TASK_FAILURE', task, error });
+ },
+ );
+ } else {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ pushGraphEvent({ kind: 'TASK_SUCCESS', task, result });
+ }
+ } catch (error) {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ pushGraphEvent({ kind: 'TASK_FAILURE', task, error });
+ }
+ }
+
+ async function startStream(stream: S): Promise {
+ rootStreams.add(stream);
+ try {
+ await stream.queue.forEachBatch(async (items) => {
+ const pushed = pushGraphEvent({
+ kind: 'STREAM_ITEMS',
+ stream,
+ items,
+ });
+ if (isPromise(pushed)) {
+ await pushed;
+ }
+ });
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ pushGraphEvent({ kind: 'STREAM_SUCCESS', stream });
+ } catch (error) {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ pushGraphEvent({ kind: 'STREAM_FAILURE', stream, error });
+ }
+ }
+
+ function handleGraphEvents(
+ graphEvents: Generator>,
+ ): ReadonlyArray> | undefined {
+ const workQueueEvents: Array> = [];
+ for (const graphEvent of graphEvents) {
+ switch (graphEvent.kind) {
+ case 'TASK_SUCCESS':
+ workQueueEvents.push(...taskSuccess(graphEvent));
+ break;
+ case 'TASK_FAILURE':
+ workQueueEvents.push(...taskFailure(graphEvent));
+ break;
+ case 'STREAM_ITEMS':
+ workQueueEvents.push(...streamItems(graphEvent));
+ break;
+ case 'STREAM_SUCCESS':
+ // check whether already deleted within streamItems()
+ if (rootStreams.has(graphEvent.stream)) {
+ rootStreams.delete(graphEvent.stream);
+ workQueueEvents.push(graphEvent);
+ }
+ break;
+ case 'STREAM_FAILURE':
+ rootStreams.delete(graphEvent.stream);
+ workQueueEvents.push(graphEvent);
+ break;
+ }
+ }
+
+ if (rootGroups.size === 0 && rootStreams.size === 0) {
+ stopGraphEvents();
+ workQueueEvents.push({ kind: 'WORK_QUEUE_TERMINATION' });
+ }
+
+ return workQueueEvents.length > 0 ? workQueueEvents : undefined;
+ }
+
+ function taskSuccess(
+ graphEvent: TaskSuccessGraphEvent,
+ ): ReadonlyArray<
+ GroupValuesEvent | GroupSuccessEvent
+ > {
+ const { task, result } = graphEvent;
+ const { value, work } = result;
+ const taskNode = taskNodes.get(task);
+ if (taskNode) {
+ taskNode.value = value;
+ }
+ maybeIntegrateWork(work, task);
+
+ const groupEvents: Array<
+ GroupValuesEvent | GroupSuccessEvent
+ > = [];
+ const newGroups: Array = [];
+ const newStreams: Array = [];
+ for (const group of task.groups) {
+ const groupNode = groupNodes.get(group);
+ if (groupNode) {
+ groupNode.pending--;
+ if (rootGroups.has(group) && groupNode.pending === 0) {
+ const {
+ groupValuesEvent,
+ groupSuccessEvent,
+ newGroups: childNewGroups,
+ newStreams: childNewStreams,
+ } = finishGroupSuccess(group, groupNode);
+ if (groupValuesEvent) {
+ groupEvents.push(groupValuesEvent);
+ }
+ groupEvents.push(groupSuccessEvent);
+ newGroups.push(...childNewGroups);
+ newStreams.push(...childNewStreams);
+ }
+ }
+ }
+
+ startWork(newGroups, newStreams);
+ return groupEvents;
+ }
+
+ function taskFailure(
+ graphEvent: TaskFailureGraphEvent,
+ ): ReadonlyArray> {
+ const { task, error } = graphEvent;
+ taskNodes.delete(task);
+ const groupFailureEvents: Array> = [];
+ for (const group of task.groups) {
+ const groupNode = groupNodes.get(group);
+ if (groupNode) {
+ groupFailureEvents.push(finishGroupFailure(group, groupNode, error));
+ }
+ }
+ return groupFailureEvents;
+ }
+
+ function streamItems(
+ graphEvent: StreamItemsEvent,
+ ):
+ | [StreamValuesEvent]
+ | [StreamValuesEvent, StreamSuccessEvent] {
+ const { stream, items } = graphEvent;
+ const values: Array = [];
+ const newGroups: Array = [];
+ const newStreams: Array = [];
+ for (const { value, work } of items) {
+ const { newGroups: itemNewGroups, newStreams: itemNewStreams } =
+ maybeIntegrateWork(work);
+ const nonEmptyNewGroups = pruneEmptyGroups(itemNewGroups);
+ startWork(nonEmptyNewGroups, itemNewStreams);
+ values.push(value);
+ newGroups.push(...nonEmptyNewGroups);
+ newStreams.push(...itemNewStreams);
+ }
+ const streamValuesEvent: StreamValuesEvent = {
+ kind: 'STREAM_VALUES',
+ stream,
+ values,
+ newGroups,
+ newStreams,
+ };
+
+ // queues allow peeking ahead see if stream has stopped
+ if (stream.queue.isStopped()) {
+ rootStreams.delete(stream);
+ return [streamValuesEvent, { kind: 'STREAM_SUCCESS', stream }];
+ }
+ return [streamValuesEvent];
+ }
+
+ function finishGroupSuccess(
+ group: G,
+ groupNode: GroupNode,
+ ): {
+ groupValuesEvent: GroupValuesEvent | undefined;
+ groupSuccessEvent: GroupSuccessEvent;
+ newGroups: ReadonlyArray;
+ newStreams: ReadonlyArray;
+ } {
+ groupNodes.delete(group);
+ const values: Array = [];
+ const newStreams: Array = [];
+ for (const task of groupNode.tasks) {
+ const taskNode = taskNodes.get(task);
+ if (taskNode) {
+ const { value, childStreams } = taskNode;
+ if (value !== undefined) {
+ values.push(value);
+ }
+ for (const childStream of childStreams) {
+ newStreams.push(childStream);
+ }
+ removeTask(task);
+ }
+ }
+ const newGroups = pruneEmptyGroups(groupNode.childGroups);
+ rootGroups.delete(group);
+ return {
+ groupValuesEvent: values.length
+ ? { kind: 'GROUP_VALUES', group, values }
+ : undefined,
+ groupSuccessEvent: {
+ kind: 'GROUP_SUCCESS',
+ group,
+ newGroups,
+ newStreams,
+ },
+ newGroups,
+ newStreams,
+ };
+ }
+
+ function finishGroupFailure(
+ group: G,
+ groupNode: GroupNode,
+ error: unknown,
+ ): GroupFailureEvent {
+ removeGroup(group, groupNode);
+ rootGroups.delete(group);
+ return { kind: 'GROUP_FAILURE', group, error };
+ }
+
+ function removeGroup(group: G, groupNode: GroupNode): void {
+ groupNodes.delete(group);
+ for (const childGroup of groupNode.childGroups) {
+ const childGroupState = groupNodes.get(childGroup);
+ if (childGroupState) {
+ removeGroup(childGroup, childGroupState);
+ }
+ }
+ }
+
+ function removeTask(task: Task): void {
+ for (const group of task.groups) {
+ const groupNode = groupNodes.get(group);
+ groupNode?.tasks.delete(task);
+ }
+ taskNodes.delete(task);
+ }
+}
diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts
index 405deaa6b1..1706b3128d 100644
--- a/src/execution/__tests__/Queue-test.ts
+++ b/src/execution/__tests__/Queue-test.ts
@@ -1,36 +1,48 @@
+/* eslint-disable @typescript-eslint/no-floating-promises */
+
import { expect } from 'chai';
import { describe, it } from 'mocha';
+import { expectPromise } from '../../__testUtils__/expectPromise.js';
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';
+import { invariant } from '../../jsutils/invariant.js';
+import { isPromise } from '../../jsutils/isPromise.js';
+import type { PromiseOrValue } from '../../jsutils/PromiseOrValue.js';
+import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';
+
import { Queue } from '../Queue.js';
describe('Queue', () => {
- it('should yield sync pushed items in order', async () => {
- const queue = new Queue