diff --git a/Framework/Core/include/Framework/DataRelayer.h b/Framework/Core/include/Framework/DataRelayer.h index 012b909096317..1e010fc12f3d4 100644 --- a/Framework/Core/include/Framework/DataRelayer.h +++ b/Framework/Core/include/Framework/DataRelayer.h @@ -114,6 +114,9 @@ class DataRelayer using OnDropCallback = std::function&, TimesliceIndex::OldestOutputInfo info)>; + // Callback for when some messages are about to be owned by the the DataRelayer + using OnInsertionCallback = std::function&)>; + /// Prune all the pending entries in the cache. void prunePending(OnDropCallback); /// Prune the cache for a given slot @@ -135,6 +138,7 @@ class DataRelayer InputInfo const& info, size_t nMessages, size_t nPayloads = 1, + OnInsertionCallback onInsertion = nullptr, OnDropCallback onDrop = nullptr); /// This is to set the oldest possible @a timeslice this relayer can diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 3925359b056b2..5c6d98e01da03 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1854,11 +1854,59 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& VariableContextHelpers::getTimeslice(variables); forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true); }; + + auto onInsertion = [](ServiceRegistryRef& ref, std::span& messages) { + O2_LOG_ENABLE(forwarding); + O2_SIGNPOST_ID_GENERATE(sid, forwarding); + + auto& spec = ref.get(); + auto& context = ref.get(); + if (!context.canForwardEarly || spec.forwards.empty()) { + O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding not enabled / needed."); + return; + } + + O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding before injecting data into relayer."); + auto& timesliceIndex = ref.get(); + auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput(); + + auto& proxy = ref.get(); + + O2_SIGNPOST_START(forwarding, sid, "forwardInputs", + "Starting forwarding for incoming messages with oldestTimeslice %zu with copy", + oldestTimeslice.timeslice.value); + std::vector forwardedParts(proxy.getNumForwardChannels()); + DataProcessingHelpers::routeForwardedMessages(proxy, messages, forwardedParts, true, false); + + for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) { + if (forwardedParts[fi].Size() == 0) { + continue; + } + ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi}); + auto& parts = forwardedParts[fi]; + if (info.policy == nullptr) { + O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi); + continue; + } + O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi); + info.policy->forward(parts, ChannelIndex{fi}, ref); + } + auto& asyncQueue = ref.get(); + auto& decongestion = ref.get(); + O2_SIGNPOST_ID_GENERATE(aid, async_queue); + O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value); + AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate} + .user({.ref = ref, .oldestTimeslice = oldestTimeslice})); + O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done"); + O2_LOG_DISABLE(forwarding); + }; + auto relayed = relayer.relay(parts.At(headerIndex)->GetData(), &parts.At(headerIndex), input, nMessages, nPayloadsPerHeader, + onInsertion, onDrop); switch (relayed.type) { case DataRelayer::RelayChoice::Type::Backpressured: @@ -2273,9 +2321,13 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting; if (context.canForwardEarly && hasForwards && consumeSomething) { - O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str()); - auto& timesliceIndex = ref.get(); - forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume); + // We used to do fowarding here, however we now do it much earlier. + // We still need to clean the inputs which were already consumed + // via ConsumeExisting and which still have an header to hold the slot. + // FIXME: do we? This should really happen when we do the forwarding on + // insertion, because otherwise we lose the relevant information on how to + // navigate the set of headers. We could actually rely on the messageset index, + // is that the right thing to do though? } markInputsAsDone(action.slot); diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index 2f7a1f65f3bd3..b1d1581f50edc 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -343,8 +343,7 @@ auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy, const bool copyByDefault, bool consume) -> std::vector { // we collect all messages per forward in a map and send them together - std::vector forwardedParts; - forwardedParts.resize(proxy.getNumForwards()); + std::vector forwardedParts(proxy.getNumForwardChannels()); std::vector forwardingChoices{}; for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { diff --git a/Framework/Core/src/DataRefUtils.cxx b/Framework/Core/src/DataRefUtils.cxx index 69eb1dc7faba6..dc278911a71c7 100644 --- a/Framework/Core/src/DataRefUtils.cxx +++ b/Framework/Core/src/DataRefUtils.cxx @@ -72,6 +72,7 @@ void* extractFromTFile(TFile& file, TClass const* cl, const char* what) return result; } } // namespace + // Adapted from CcdbApi private method interpretAsTMemFileAndExtract // If the former is moved to public, throws on error and could be changed to // not require a mutex we could use it. diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 01e7a2b29fd35..ea2c4c0b73316 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -436,7 +436,8 @@ DataRelayer::RelayChoice InputInfo const& info, size_t nMessages, size_t nPayloads, - std::function&, TimesliceIndex::OldestOutputInfo)> onDrop) + OnInsertionCallback onInsertion, + OnDropCallback onDrop) { std::scoped_lock lock(mMutex); DataProcessingHeader const* dph = o2::header::get(rawHeader); @@ -482,6 +483,7 @@ DataRelayer::RelayChoice &messages, &nMessages, &nPayloads, + &onInsertion, &cache = mCache, &services = mContext, numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t { @@ -512,7 +514,11 @@ DataRelayer::RelayChoice mi += nPayloads; continue; } - target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1); + auto span = std::span(messages + mi, messages + mi + nPayloads + 1); + if (onInsertion) { + onInsertion(services, span); + } + target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1); mi += nPayloads; saved += nPayloads; } diff --git a/Framework/Core/test/test_ForwardInputs.cxx b/Framework/Core/test/test_ForwardInputs.cxx index fe9f70d1daadb..7081d600080b1 100644 --- a/Framework/Core/test/test_ForwardInputs.cxx +++ b/Framework/Core/test/test_ForwardInputs.cxx @@ -616,6 +616,80 @@ TEST_CASE("ForwardInputsSplitPayload") CHECK(result[1].Size() == 3); } +TEST_CASE("ForwardInputsSplitPayloadNoMessageSet") +{ + o2::header::DataHeader dh; + dh.dataOrigin = "TST"; + dh.dataDescription = "A"; + dh.subSpecification = 0; + dh.splitPayloadIndex = 2; + dh.splitPayloadParts = 2; + + o2::header::DataHeader dh2; + dh2.dataOrigin = "TST"; + dh2.dataDescription = "B"; + dh2.subSpecification = 0; + dh2.splitPayloadIndex = 0; + dh2.splitPayloadParts = 1; + + o2::framework::DataProcessingHeader dph{0, 1}; + + std::vector channels{ + fair::mq::Channel("from_A_to_B"), + fair::mq::Channel("from_A_to_C"), + }; + + bool consume = true; + bool copyByDefault = true; + FairMQDeviceProxy proxy; + std::vector routes{ + ForwardRoute{ + .timeslice = 0, + .maxTimeslices = 1, + .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}}, + .channel = "from_A_to_B", + .policy = nullptr, + }, + ForwardRoute{ + .timeslice = 0, + .maxTimeslices = 1, + .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}}, + .channel = "from_A_to_C", + .policy = nullptr, + }}; + + auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& { + for (auto& channel : channels) { + if (channel.GetName() == channelName) { + return channel; + } + } + throw std::runtime_error("Channel not found"); + }; + + proxy.bind({}, {}, routes, findChannelByName, nullptr); + + auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); + fair::mq::MessagePtr payload1(transport->CreateMessage()); + fair::mq::MessagePtr payload2(transport->CreateMessage()); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); + std::vector> messages; + messages.push_back(std::move(header)); + messages.push_back(std::move(payload1)); + messages.push_back(std::move(payload2)); + auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph}); + messages.push_back(std::move(header2)); + messages.push_back(transport->CreateMessage()); + + std::vector result(2); + auto span = std::span(messages); + o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, span, result, copyByDefault, consume); + REQUIRE(result.size() == 2); // Two routes + CHECK(result[0].Size() == 2); // No messages on this route + CHECK(result[1].Size() == 3); +} + TEST_CASE("ForwardInputEOSSingleRoute") { o2::framework::SourceInfoHeader sih{};