Skip to content

Commit 6feba92

Browse files
committed
MOre
1 parent cdab978 commit 6feba92

File tree

2 files changed

+109
-66
lines changed

2 files changed

+109
-66
lines changed

arangod/Aql/ExecutionBlockImpl.h

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -341,18 +341,32 @@ class ExecutionBlockImpl final : public ExecutionBlock {
341341

342342
auto countShadowRowProduced(AqlCallStack& stack, size_t depth) -> void;
343343

344+
auto forwardShadowRow(AqlCallStack& stack,
345+
std::unique_ptr<OutputAqlItemRow>& _outputItemRow,
346+
ShadowAqlItemRow& shadowRow) -> void;
347+
348+
enum class SideEffectSkipResult {
349+
FORWARD_SHADOW_ROW,
350+
DROP_SHADOW_ROW,
351+
RETURN_DONE
352+
};
353+
auto sideEffectSkipHandling(AqlCallStack& stack,
354+
std::unique_ptr<OutputAqlItemRow>& _outputItemRow,
355+
ShadowAqlItemRow& shadowRow, SkipResult& skipped)
356+
-> SideEffectSkipResult;
357+
344358
private:
345359
/**
346360
* @brief The PrefetchTask is used to asynchronously prefetch the next batch
347361
* from upstream. Each block holds only a single instance (if any), so each
348362
* block can have max one pending async prefetch task. This instance is
349-
* created on demand when the first async request is spawned, later tasks can
350-
* reuse that instance.
351-
* The async task is queued on the global scheduler so it can be picked up by
352-
* some worker thread. However, sometimes the original thread might that
353-
* created the task might be faster, in which case we don't want to wait
354-
* until a worker has picked up the task. Instead, any thread that wants to
355-
* process the task has to _claim_ it. This is managed via the task's `state`.
363+
* created on demand when the first async request is spawned, later tasks
364+
* can reuse that instance. The async task is queued on the global scheduler
365+
* so it can be picked up by some worker thread. However, sometimes the
366+
* original thread might that created the task might be faster, in which
367+
* case we don't want to wait until a worker has picked up the task.
368+
* Instead, any thread that wants to process the task has to _claim_ it.
369+
* This is managed via the task's `state`.
356370
*
357371
* Before the task is queued on the scheduler, `state` is set to `Pending`.
358372
* When a thread wants to process the task, it must call `tryClaim` which

arangod/Aql/ExecutionBlockImpl.tpp

Lines changed: 88 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,69 @@ auto ExecutionBlockImpl<Executor>::shadowRowForwardingSubqueryEnd(
13211321
}
13221322
}
13231323

1324+
template<class Executor>
1325+
auto ExecutionBlockImpl<Executor>::forwardShadowRow(
1326+
AqlCallStack& stack, std::unique_ptr<OutputAqlItemRow>& _outputItemRow,
1327+
ShadowAqlItemRow& shadowRow) -> void {
1328+
auto shadowDepth = shadowRow.getDepth();
1329+
auto& shadowCall = stack.modifyCallAtDepth(shadowDepth);
1330+
_outputItemRow->moveRow(shadowRow);
1331+
shadowCall.didProduce(1);
1332+
TRI_ASSERT(_outputItemRow->produced());
1333+
_outputItemRow->advanceRow();
1334+
1335+
// The call at the next level produced the stuff for this shadow row
1336+
// and hence should be popped.
1337+
// TODO: Technically all calls of lower depth should be popped (and are
1338+
// at the end of this function. This means that the call at shadowDepth-1
1339+
// is pooped twice. Thid has no effect since popping a call is idempotent.
1340+
if (!shadowRow.isRelevant()) {
1341+
std::ignore = stack.modifyCallListAtDepth(shadowDepth - 1).popNextCall();
1342+
}
1343+
}
1344+
1345+
template<class Executor>
1346+
auto ExecutionBlockImpl<Executor>::sideEffectSkipHandling(
1347+
AqlCallStack& stack, std::unique_ptr<OutputAqlItemRow>& _outputItemRow,
1348+
ShadowAqlItemRow& shadowRow, SkipResult& skipped) -> SideEffectSkipResult {
1349+
auto shadowDepth = shadowRow.getDepth();
1350+
1351+
auto maybeDepthSkippingNow = stack.shadowRowDepthToSkip();
1352+
if (maybeDepthSkippingNow.has_value()) {
1353+
auto depthSkippingNow = maybeDepthSkippingNow.value();
1354+
if (depthSkippingNow > shadowDepth) {
1355+
if (!stack.modifyCallListAtDepth(depthSkippingNow).hasMoreCalls()) {
1356+
return SideEffectSkipResult::RETURN_DONE;
1357+
}
1358+
return SideEffectSkipResult::DROP_SHADOW_ROW;
1359+
// We are skipping the outermost Subquery.
1360+
// Simply drop this ShadowRow
1361+
} else if (depthSkippingNow == shadowDepth) {
1362+
AqlCall& shadowCall = stack.modifyCallAtDepth(shadowDepth);
1363+
// We are skipping on this subquery level.
1364+
// Skip the row, but report skipped 1.
1365+
if (shadowCall.needSkipMore()) {
1366+
shadowCall.didSkip(1);
1367+
shadowCall.resetSkipCount();
1368+
skipped.didSkipSubquery(1, shadowDepth);
1369+
// also does not write the shadow row.
1370+
return SideEffectSkipResult::DROP_SHADOW_ROW;
1371+
} else if (shadowCall.getLimit() > 0) {
1372+
TRI_ASSERT(!shadowCall.needSkipMore() && shadowCall.getLimit() > 0);
1373+
return SideEffectSkipResult::FORWARD_SHADOW_ROW;
1374+
} else {
1375+
TRI_ASSERT(shadowCall.hardLimit == 0);
1376+
// Simply drop this shadowRow!
1377+
return SideEffectSkipResult::DROP_SHADOW_ROW;
1378+
}
1379+
} else /* depthSkippingNow < shadowDepth */ {
1380+
// We got a shadowRow of a subquery we are not skipping here.
1381+
// Do proper reporting on its call.
1382+
}
1383+
}
1384+
return SideEffectSkipResult::FORWARD_SHADOW_ROW;
1385+
}
1386+
13241387
template<class Executor>
13251388
auto ExecutionBlockImpl<Executor>::shadowRowForwarding(AqlCallStack& stack)
13261389
-> ExecState {
@@ -1334,9 +1397,16 @@ auto ExecutionBlockImpl<Executor>::shadowRowForwarding(AqlCallStack& stack)
13341397
TRI_ASSERT(_outputItemRow->isInitialized());
13351398
TRI_ASSERT(!_outputItemRow->allRowsUsed());
13361399
if (!_lastRange.hasShadowRow()) {
1337-
// We got back without a ShadowRow in the LastRange
1338-
// Let us continue with the next Subquery
1339-
return ExecState::NEXTSUBQUERY;
1400+
// TODO: the original sideEffectShadowRowForwarding returns
1401+
// ExecState::DONE in this case. It is likely correct
1402+
// to just return ExecState::NEXTSUBQUERY and remove this
1403+
// case distinction.
1404+
// Let's do that in a separate PR. In particular keep UPSERT in mind.
1405+
if constexpr (executorHasSideEffects<Executor>) {
1406+
return ExecState::DONE;
1407+
} else {
1408+
return ExecState::NEXTSUBQUERY;
1409+
}
13401410
}
13411411

13421412
auto&& [state, shadowRow] = _lastRange.nextShadowRow();
@@ -1363,69 +1433,28 @@ auto ExecutionBlockImpl<Executor>::shadowRowForwarding(AqlCallStack& stack)
13631433
// ranges synchronize shadow rows, and fetcher synchronizes skipping
13641434
//
13651435
// but there are interactions between the two.
1436+
//
1437+
// Also note that executors with this DataRange type have no side effects,
1438+
// so merging the two functions can leave this in place
13661439
if constexpr (std::is_same_v<DataRange, MultiAqlItemBlockInputRange>) {
1440+
auto shadowDepth = shadowRow.getDepth();
13671441
fetcher().resetDidReturnSubquerySkips(shadowDepth);
13681442
}
13691443

1370-
// TODO: make member/function in namespace {}
1371-
auto writeShadowRow = [](AqlCallStack& stack,
1372-
std::unique_ptr<OutputAqlItemRow>& _outputItemRow,
1373-
ShadowAqlItemRow& shadowRow) {
1374-
auto shadowDepth = shadowRow.getDepth();
1375-
auto& shadowCall = stack.modifyCallAtDepth(shadowDepth);
1376-
_outputItemRow->moveRow(shadowRow);
1377-
shadowCall.didProduce(1);
1378-
TRI_ASSERT(_outputItemRow->produced());
1379-
_outputItemRow->advanceRow();
1380-
1381-
// TODO Why? This comes from countShadowRowProduced
1382-
//
1383-
if (!shadowRow.isRelevant()) {
1384-
std::ignore = stack.modifyCallListAtDepth(shadowDepth - 1).popNextCall();
1385-
}
1386-
};
1387-
13881444
if constexpr (executorHasSideEffects<Executor>) {
1389-
// finds the *highest depth* (equivalently *outermost*) entry in
1390-
// the call stack that either
1391-
// - has no calls, in which case we cannot do anything and have to return
1392-
// ExecState::DONE
1393-
// - has a call which needs to skip or has a limit
1394-
auto maybeDepthSkippingNow = stack.shadowRowDepthToSkip();
1395-
if (maybeDepthSkippingNow.has_value()) {
1396-
auto depthSkippingNow = maybeDepthSkippingNow.value();
1397-
if (depthSkippingNow > shadowDepth) {
1398-
if (!stack.modifyCallListAtDepth(depthSkippingNow).hasMoreCalls()) {
1399-
return ExecState::DONE;
1400-
}
1401-
// We are skipping the outermost Subquery.
1402-
// Simply drop this ShadowRow
1403-
} else if (depthSkippingNow == shadowDepth) {
1404-
AqlCall& shadowCall = stack.modifyCallAtDepth(shadowDepth);
1405-
// We are skipping on this subquery level.
1406-
// Skip the row, but report skipped 1.
1407-
if (shadowCall.needSkipMore()) {
1408-
shadowCall.didSkip(1);
1409-
shadowCall.resetSkipCount();
1410-
_skipped.didSkipSubquery(1, shadowDepth);
1411-
// also does not write the shadow row.
1412-
} else if (shadowCall.getLimit() > 0) {
1413-
TRI_ASSERT(!shadowCall.needSkipMore() && shadowCall.getLimit() > 0);
1414-
writeShadowRow(stack, _outputItemRow, shadowRow);
1415-
} else {
1416-
TRI_ASSERT(shadowCall.hardLimit == 0);
1417-
// Simply drop this shadowRow!
1418-
}
1419-
} else /* depthSkippingNow < shadowDepth */ {
1420-
// We got a shadowRow of a subquery we are not skipping here.
1421-
// Do proper reporting on its call.
1422-
writeShadowRow(stack, _outputItemRow, shadowRow);
1423-
}
1424-
} else {
1425-
writeShadowRow(stack, _outputItemRow, shadowRow);
1445+
auto r = sideEffectSkipHandling(stack, _outputItemRow, shadowRow, _skipped);
1446+
switch (r) {
1447+
case SideEffectSkipResult::RETURN_DONE: {
1448+
return ExecState::DONE;
1449+
} break;
1450+
case SideEffectSkipResult::FORWARD_SHADOW_ROW: {
1451+
forwardShadowRow(stack, _outputItemRow, shadowRow);
1452+
} break;
1453+
case SideEffectSkipResult::DROP_SHADOW_ROW: {
1454+
} break;
14261455
}
14271456
} else {
1428-
writeShadowRow(stack, _outputItemRow, shadowRow);
1457+
forwardShadowRow(stack, _outputItemRow, shadowRow);
14291458
}
14301459

14311460
if (state == ExecutorState::DONE) {

0 commit comments

Comments
 (0)