diff --git a/phlex/core/CMakeLists.txt b/phlex/core/CMakeLists.txt index e2e24664e..60c307cc5 100644 --- a/phlex/core/CMakeLists.txt +++ b/phlex/core/CMakeLists.txt @@ -15,6 +15,7 @@ cet_make_library( detail/make_algorithm_name.cpp detail/maybe_predicates.cpp detail/filter_impl.cpp + detail/repeater_node.cpp edge_creation_policy.cpp edge_maker.cpp filter.cpp @@ -22,9 +23,8 @@ cet_make_library( glue.cpp input_arguments.cpp message.cpp - message_sender.cpp node_catalog.cpp - multiplexer.cpp + index_router.cpp products_consumer.cpp registrar.cpp registration_api.cpp @@ -61,8 +61,7 @@ install( graph_proxy.hpp input_arguments.hpp message.hpp - message_sender.hpp - multiplexer.hpp + index_router.hpp node_catalog.hpp product_query.hpp products_consumer.hpp @@ -74,7 +73,11 @@ install( ) install(FILES fold/send.hpp DESTINATION include/phlex/core/fold) install( - FILES detail/make_algorithm_name.hpp detail/maybe_predicates.hpp detail/filter_impl.hpp + FILES + detail/filter_impl.hpp + detail/make_algorithm_name.hpp + detail/maybe_predicates.hpp + detail/repeater_node.hpp DESTINATION include/phlex/core/detail ) target_include_directories(phlex_core PRIVATE ${PROJECT_SOURCE_DIR}) diff --git a/phlex/core/declared_fold.hpp b/phlex/core/declared_fold.hpp index c1e337bf8..387fe0e26 100644 --- a/phlex/core/declared_fold.hpp +++ b/phlex/core/declared_fold.hpp @@ -7,6 +7,7 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/input_arguments.hpp" #include "phlex/core/message.hpp" +#include "phlex/core/multilayer_join_node.hpp" #include "phlex/core/product_query.hpp" #include "phlex/core/products_consumer.hpp" #include "phlex/core/store_counters.hpp" @@ -42,6 +43,7 @@ namespace phlex::experimental { virtual tbb::flow::sender& sender() = 0; virtual tbb::flow::sender& to_output() = 0; + virtual tbb::flow::receiver& flush_port() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; }; @@ -75,59 +77,70 @@ namespace phlex::experimental { initializer_{std::move(initializer)}, output_{to_product_specifications(full_name(), std::move(output), make_type_ids())}, partition_{std::move(partition)}, - join_{make_join_or_none(g, std::make_index_sequence{})}, - fold_{g, - concurrency, - [this, ft = alg.release_algorithm()](messages_t const& messages, auto& outputs) { - // N.B. The assumption is that a fold will *never* need to cache - // the product store it creates. Any flush messages *do not* need - // to be propagated to downstream nodes. - auto const& msg = most_derived(messages); - auto const& [store, original_message_id] = std::tie(msg.store, msg.original_id); - - if (not store->is_flush() and not store->index()->parent(partition_)) { - return; - } - - if (store->is_flush()) { - // Downstream nodes always get the flush. - get<0>(outputs).try_put(msg); - if (store->index()->layer_name() != partition_) { - return; - } - } - - auto const& fold_index = - store->is_flush() ? store->index() : store->index()->parent(partition_); - assert(fold_index); - auto const& id_hash_for_counter = fold_index->hash(); - - if (store->is_flush()) { - counter_for(id_hash_for_counter).set_flush_value(store, original_message_id); - } else { - call(ft, messages, std::make_index_sequence{}); - counter_for(id_hash_for_counter).increment(store->index()->layer_hash()); - } - - if (auto counter = done_with(id_hash_for_counter)) { - auto parent = std::make_shared(fold_index, this->full_name()); - commit_(*parent); - ++product_count_; - get<0>(outputs).try_put({parent, counter->original_message_id()}); - } - }} + flush_receiver_{g, + tbb::flow::unlimited, + [this](flush_message const& msg) -> tbb::flow::continue_msg { + auto const& [index, counts, original_message_id] = msg; + if (index->layer_name() != partition_) { + return {}; + } + + counter_for(index->hash()).set_flush_value(counts, original_message_id); + emit_and_evict_if_done(index); + return {}; + }}, + join_{make_join_or_none( + g, full_name(), layers())}, // FIXME: This should change to include result product! + fold_{ + g, concurrency, [this, ft = alg.release_algorithm()](messages_t const& messages, auto&) { + // N.B. The assumption is that a fold will *never* need to cache + // the product store it creates. Any flush messages *do not* need + // to be propagated to downstream nodes. + auto const& msg = most_derived(messages); + auto const& index = msg.store->index(); + + auto fold_index = index->parent(partition_); + if (not fold_index) { + return; + } + + auto const& index_hash_for_counter = fold_index->hash(); + + call(ft, messages, std::make_index_sequence{}); + ++calls_; + + counter_for(index_hash_for_counter).increment(index->layer_hash()); + + emit_and_evict_if_done(fold_index); + }} { - make_edge(join_, fold_); + if constexpr (N > 1ull) { + make_edge(join_, fold_); + } } private: + void emit_and_evict_if_done(data_cell_index_ptr const& fold_index) + { + if (auto counter = done_with(fold_index->hash())) { + auto parent = std::make_shared(fold_index, this->full_name()); + commit_(parent); + ++product_count_; + output_port<0>(fold_).try_put({parent, counter->original_message_id()}); + } + } + tbb::flow::receiver& port_for(product_query const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label, fold_); } - std::vector*> ports() override { return input_ports(join_); } + std::vector*> ports() override + { + return input_ports(join_, fold_); + } + tbb::flow::receiver& flush_port() override { return flush_receiver_; } tbb::flow::sender& sender() override { return output_port<0ull>(fold_); } tbb::flow::sender& to_output() override { return sender(); } product_specifications const& output() const override { return output_; } @@ -135,21 +148,27 @@ namespace phlex::experimental { template void call(function_t const& ft, messages_t const& messages, std::index_sequence) { - auto const& parent_id = *most_derived(messages).store->index()->parent(partition_); + auto const parent_index = most_derived(messages).store->index()->parent(partition_); + // FIXME: Not the safest approach! - auto it = results_.find(parent_id); + auto it = results_.find(parent_index->hash()); if (it == results_.end()) { it = results_ - .insert({parent_id, + .insert({parent_index->hash(), initialized_object(std::move(initializer_), std::make_index_sequence>{})}) .first; } - ++calls_; - return std::invoke(ft, *it->second, std::get(input_).retrieve(std::get(messages))...); + + if constexpr (N == 1ull) { + std::invoke(ft, *it->second, std::get(input_).retrieve(messages)...); + } else { + std::invoke(ft, *it->second, std::get(input_).retrieve(std::get(messages))...); + } } + named_index_ports index_ports() final { return join_.index_ports(); } std::size_t num_calls() const final { return calls_.load(); } std::size_t product_count() const final { return product_count_.load(); } @@ -160,13 +179,13 @@ namespace phlex::experimental { new R{std::forward>(std::get(tuple))...}}; } - void commit_(product_store& store) + auto commit_(product_store_ptr& store) { - auto& result = results_.at(*store.index()); + auto& result = results_.at(store->index()->hash()); if constexpr (requires { send(*result); }) { - store.add_product(output()[0].name(), send(*result)); + store->add_product(output()[0].name(), send(*result)); } else { - store.add_product(output()[0].name(), std::move(*result)); + store->add_product(output()[0].name(), std::move(result)); } // Reclaim some memory; it would be better to erase the entire entry from the map, // but that is not thread-safe. @@ -177,9 +196,10 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; product_specifications output_; std::string partition_; + tbb::flow::function_node flush_receiver_; join_or_none_t join_; - tbb::flow::multifunction_node, messages_t<1>> fold_; - tbb::concurrent_unordered_map> results_; + tbb::flow::multifunction_node, message_tuple<1>> fold_; + tbb::concurrent_unordered_map> results_; std::atomic calls_; std::atomic product_count_; }; diff --git a/phlex/core/declared_observer.cpp b/phlex/core/declared_observer.cpp index 2f53f66e9..9661cfeb4 100644 --- a/phlex/core/declared_observer.cpp +++ b/phlex/core/declared_observer.cpp @@ -1,8 +1,5 @@ #include "phlex/core/declared_observer.hpp" -#include "fmt/std.h" -#include "spdlog/spdlog.h" - namespace phlex::experimental { declared_observer::declared_observer(algorithm_name name, std::vector predicates, @@ -12,15 +9,4 @@ namespace phlex::experimental { } declared_observer::~declared_observer() = default; - - void declared_observer::report_cached_hashes( - tbb::concurrent_hash_map const& hashes) const - { - if (hashes.size() > 0ull) { - spdlog::warn("Monitor {} has {} cached hashes.", full_name(), hashes.size()); - } - for (auto const& [id, _] : hashes) { - spdlog::debug(" => ID: {}", id); - } - } } diff --git a/phlex/core/declared_observer.hpp b/phlex/core/declared_observer.hpp index eebe0235b..f9d8287a4 100644 --- a/phlex/core/declared_observer.hpp +++ b/phlex/core/declared_observer.hpp @@ -5,6 +5,7 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/input_arguments.hpp" #include "phlex/core/message.hpp" +#include "phlex/core/multilayer_join_node.hpp" #include "phlex/core/product_query.hpp" #include "phlex/core/products_consumer.hpp" #include "phlex/core/store_counters.hpp" @@ -16,7 +17,6 @@ #include "phlex/model/product_store.hpp" #include "phlex/utilities/simple_ptr_map.hpp" -#include "oneapi/tbb/concurrent_hash_map.h" #include "oneapi/tbb/flow_graph.h" #include @@ -36,12 +36,6 @@ namespace phlex::experimental { std::vector predicates, product_queries input_products); virtual ~declared_observer(); - - protected: - using hashes_t = tbb::concurrent_hash_map; - using accessor = hashes_t::accessor; - - void report_cached_hashes(hashes_t const& hashes) const; }; using declared_observer_ptr = std::unique_ptr; @@ -50,7 +44,7 @@ namespace phlex::experimental { // ===================================================================================== template - class observer_node : public declared_observer, private detect_flush_flag { + class observer_node : public declared_observer { using InputArgs = typename AlgorithmBits::input_parameter_types; using function_t = typename AlgorithmBits::bound_type; static constexpr auto N = AlgorithmBits::number_inputs; @@ -66,61 +60,48 @@ namespace phlex::experimental { AlgorithmBits alg, product_queries input_products) : declared_observer{std::move(name), std::move(predicates), std::move(input_products)}, - join_{make_join_or_none(g, std::make_index_sequence{})}, + join_{make_join_or_none(g, full_name(), layers())}, observer_{g, concurrency, [this, ft = alg.release_algorithm()]( messages_t const& messages) -> oneapi::tbb::flow::continue_msg { - auto const& msg = most_derived(messages); - auto const& [store, message_id] = std::tie(msg.store, msg.id); - if (store->is_flush()) { - mark_flush_received(store->index()->hash(), message_id); - } else if (accessor a; needs_new(store, a)) { - call(ft, messages, std::make_index_sequence{}); - a->second = true; - mark_processed(store->index()->hash()); - } - - if (done_with(store)) { - cached_hashes_.erase(store->index()->hash()); - } + call(ft, messages, std::make_index_sequence{}); + ++calls_; return {}; }} { - make_edge(join_, observer_); + if constexpr (N > 1ull) { + make_edge(join_, observer_); + } } - ~observer_node() { report_cached_hashes(cached_hashes_); } - private: tbb::flow::receiver& port_for(product_query const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label, observer_); } - std::vector*> ports() override { return input_ports(join_); } - - bool needs_new(product_store_const_ptr const& store, accessor& a) + std::vector*> ports() override { - if (cached_hashes_.count(store->index()->hash()) > 0ull) { - return false; - } - return cached_hashes_.insert(a, store->index()->hash()); + return input_ports(join_, observer_); } template void call(function_t const& ft, messages_t const& messages, std::index_sequence) { - ++calls_; - return std::invoke(ft, std::get(input_).retrieve(std::get(messages))...); + if constexpr (N == 1ull) { + std::invoke(ft, std::get(input_).retrieve(messages)...); + } else { + std::invoke(ft, std::get(input_).retrieve(std::get(messages))...); + } } + named_index_ports index_ports() final { return join_.index_ports(); } std::size_t num_calls() const final { return calls_.load(); } input_retriever_types input_{input_arguments()}; join_or_none_t join_; tbb::flow::function_node> observer_; - hashes_t cached_hashes_; std::atomic calls_; }; } diff --git a/phlex/core/declared_output.cpp b/phlex/core/declared_output.cpp index 4f14fb2cf..5dbfa4455 100644 --- a/phlex/core/declared_output.cpp +++ b/phlex/core/declared_output.cpp @@ -10,10 +10,8 @@ namespace phlex::experimental { detail::output_function_t&& ft) : consumer{std::move(name), std::move(predicates)}, node_{g, concurrency, [this, f = std::move(ft)](message const& msg) -> tbb::flow::continue_msg { - if (not msg.store->is_flush()) { - f(*msg.store); - ++calls_; - } + f(*msg.store); + ++calls_; return {}; }} { diff --git a/phlex/core/declared_predicate.cpp b/phlex/core/declared_predicate.cpp index e3761cd40..056d8ea9e 100644 --- a/phlex/core/declared_predicate.cpp +++ b/phlex/core/declared_predicate.cpp @@ -1,8 +1,5 @@ #include "phlex/core/declared_predicate.hpp" -#include "fmt/std.h" -#include "spdlog/spdlog.h" - namespace phlex::experimental { declared_predicate::declared_predicate(algorithm_name name, std::vector predicates, @@ -12,11 +9,4 @@ namespace phlex::experimental { } declared_predicate::~declared_predicate() = default; - - void declared_predicate::report_cached_results(results_t const& results) const - { - if (results.size() > 0ull) { - spdlog::warn("Filter {} has {} cached results.", full_name(), results.size()); - } - } } diff --git a/phlex/core/declared_predicate.hpp b/phlex/core/declared_predicate.hpp index 2606cdc1c..dfc847839 100644 --- a/phlex/core/declared_predicate.hpp +++ b/phlex/core/declared_predicate.hpp @@ -6,9 +6,9 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/input_arguments.hpp" #include "phlex/core/message.hpp" +#include "phlex/core/multilayer_join_node.hpp" #include "phlex/core/product_query.hpp" #include "phlex/core/products_consumer.hpp" -#include "phlex/core/store_counters.hpp" #include "phlex/metaprogramming/type_deduction.hpp" #include "phlex/model/algorithm_name.hpp" #include "phlex/model/data_cell_index.hpp" @@ -16,7 +16,6 @@ #include "phlex/model/product_store.hpp" #include "phlex/utilities/simple_ptr_map.hpp" -#include "oneapi/tbb/concurrent_hash_map.h" #include "oneapi/tbb/flow_graph.h" #include @@ -41,13 +40,6 @@ namespace phlex::experimental { virtual ~declared_predicate(); virtual tbb::flow::sender& sender() = 0; - - protected: - using results_t = tbb::concurrent_hash_map; - using accessor = results_t::accessor; - using const_accessor = results_t::const_accessor; - - void report_cached_results(results_t const& results) const; }; using declared_predicate_ptr = std::unique_ptr; @@ -56,7 +48,7 @@ namespace phlex::experimental { // ===================================================================================== template - class predicate_node : public declared_predicate, private detect_flush_flag { + class predicate_node : public declared_predicate { using InputArgs = typename AlgorithmBits::input_parameter_types; using function_t = typename AlgorithmBits::bound_type; static constexpr auto N = AlgorithmBits::number_inputs; @@ -72,58 +64,52 @@ namespace phlex::experimental { AlgorithmBits alg, product_queries input_products) : declared_predicate{std::move(name), std::move(predicates), std::move(input_products)}, - join_{make_join_or_none(g, std::make_index_sequence{})}, + join_{make_join_or_none(g, full_name(), layers())}, predicate_{ g, concurrency, [this, ft = alg.release_algorithm()](messages_t const& messages) -> predicate_result { auto const& msg = most_derived(messages); auto const& [store, message_id] = std::tie(msg.store, msg.id); - predicate_result result{}; - if (store->is_flush()) { - mark_flush_received(store->index()->hash(), message_id); - } else if (const_accessor a; results_.find(a, store->index()->hash())) { - result = {message_id, a->second.result}; - } else if (accessor a; results_.insert(a, store->index()->hash())) { - bool const rc = call(ft, messages, std::make_index_sequence{}); - result = a->second = {message_id, rc}; - mark_processed(store->index()->hash()); - } - - if (done_with(store)) { - results_.erase(store->index()->hash()); - } - return result; + + bool const rc = call(ft, messages, std::make_index_sequence{}); + ++calls_; + return {message_id, rc}; }} { - make_edge(join_, predicate_); + if constexpr (N > 1ull) { + make_edge(join_, predicate_); + } } - ~predicate_node() { report_cached_results(results_); } - private: tbb::flow::receiver& port_for(product_query const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label, predicate_); } - std::vector*> ports() override { return input_ports(join_); } - + std::vector*> ports() override + { + return input_ports(join_, predicate_); + } tbb::flow::sender& sender() override { return predicate_; } template bool call(function_t const& ft, messages_t const& messages, std::index_sequence) { - ++calls_; - return std::invoke(ft, std::get(input_).retrieve(std::get(messages))...); + if constexpr (N == 1ull) { + return std::invoke(ft, std::get(input_).retrieve(messages)...); + } else { + return std::invoke(ft, std::get(input_).retrieve(std::get(messages))...); + } } + named_index_ports index_ports() final { return join_.index_ports(); } std::size_t num_calls() const final { return calls_.load(); } input_retriever_types input_{input_arguments()}; join_or_none_t join_; tbb::flow::function_node, predicate_result> predicate_; - results_t results_; std::atomic calls_; }; diff --git a/phlex/core/declared_provider.cpp b/phlex/core/declared_provider.cpp index 85e1c79e1..251a3eeee 100644 --- a/phlex/core/declared_provider.cpp +++ b/phlex/core/declared_provider.cpp @@ -15,17 +15,5 @@ namespace phlex::experimental { return output_product_; } - void declared_provider::report_cached_stores(stores_t const& stores) const - { - if (stores.size() > 0ull) { - spdlog::warn("Provider {} has {} cached stores.", full_name(), stores.size()); - } - for (auto const& [hash, store] : stores) { - if (not store) { - spdlog::warn("Store with hash {} is null!", hash); - continue; - } - spdlog::debug(" => ID: {} (hash: {})", store->index()->to_string(), hash); - } - } + std::string const& declared_provider::layer() const noexcept { return output_product_.layer(); } } diff --git a/phlex/core/declared_provider.hpp b/phlex/core/declared_provider.hpp index 57e2a3f93..9ba012029 100644 --- a/phlex/core/declared_provider.hpp +++ b/phlex/core/declared_provider.hpp @@ -4,16 +4,12 @@ #include "phlex/core/concepts.hpp" #include "phlex/core/fwd.hpp" #include "phlex/core/message.hpp" -#include "phlex/core/store_counters.hpp" -#include "phlex/metaprogramming/type_deduction.hpp" #include "phlex/model/algorithm_name.hpp" #include "phlex/model/data_cell_index.hpp" #include "phlex/model/product_specification.hpp" #include "phlex/model/product_store.hpp" #include "phlex/utilities/simple_ptr_map.hpp" -#include "oneapi/tbb/concurrent_hash_map.h" -#include "oneapi/tbb/concurrent_unordered_map.h" #include "oneapi/tbb/flow_graph.h" #include "spdlog/spdlog.h" @@ -33,17 +29,12 @@ namespace phlex::experimental { std::string full_name() const; product_query const& output_product() const noexcept; + std::string const& layer() const noexcept; - virtual tbb::flow::receiver* input_port() = 0; + virtual tbb::flow::receiver* input_port() = 0; virtual tbb::flow::sender& sender() = 0; virtual std::size_t num_calls() const = 0; - protected: - using stores_t = tbb::concurrent_hash_map; - using const_accessor = stores_t::const_accessor; - - void report_cached_stores(stores_t const& stores) const; - private: algorithm_name name_; product_query output_product_; @@ -55,9 +46,7 @@ namespace phlex::experimental { // ===================================================================================== template - class provider_node : public declared_provider, private detect_flush_flag { - using function_t = typename AlgorithmBits::bound_type; - + class provider_node : public declared_provider { public: using node_ptr_type = declared_provider_ptr; @@ -68,63 +57,38 @@ namespace phlex::experimental { product_query output) : declared_provider{std::move(name), output}, output_{output.spec()}, - provider_{ - g, concurrency, [this, ft = alg.release_algorithm()](message const& msg, auto& output) { - auto& [stay_in_graph, to_output] = output; - - if (msg.store->is_flush()) { - mark_flush_received(msg.store->index()->hash(), msg.original_id); - stay_in_graph.try_put(msg); - } else { - // Check cache first - auto index_hash = msg.store->index()->hash(); - if (const_accessor ca; cache_.find(ca, index_hash)) { - // Cache hit - reuse the cached store - message const new_msg{ca->second, msg.id}; - stay_in_graph.try_put(new_msg); - to_output.try_put(new_msg); - return; - } - - // Cache miss - compute the result - auto result = std::invoke(ft, *msg.store->index()); - ++calls_; - - products new_products; - new_products.add(output_.name(), std::move(result)); - auto store = std::make_shared( - msg.store->index(), this->full_name(), std::move(new_products)); - - // Store in cache - cache_.emplace(index_hash, store); - - message const new_msg{store, msg.id}; - stay_in_graph.try_put(new_msg); - to_output.try_put(new_msg); - mark_processed(msg.store->index()->hash()); - } - - if (done_with(msg.store)) { - cache_.erase(msg.store->index()->hash()); - } - }} + provider_{g, + concurrency, + [this, ft = alg.release_algorithm()](index_message const& index_msg, auto& output) { + auto& [stay_in_graph, to_output] = output; + auto const [index, msg_id, _] = index_msg; + + auto result = std::invoke(ft, *index); + ++calls_; + + products new_products; + new_products.add(output_.name(), std::move(result)); + auto store = std::make_shared( + index, this->full_name(), std::move(new_products)); + + message const new_msg{store, msg_id}; + stay_in_graph.try_put(new_msg); + to_output.try_put(new_msg); + }} { spdlog::debug( "Created provider node {} making output {}", this->full_name(), output.to_string()); } - ~provider_node() { report_cached_stores(cache_); } - private: - tbb::flow::receiver* input_port() override { return &provider_; } + tbb::flow::receiver* input_port() override { return &provider_; } tbb::flow::sender& sender() override { return output_port<0>(provider_); } std::size_t num_calls() const final { return calls_.load(); } product_specification output_; - tbb::flow::multifunction_node> provider_; + tbb::flow::multifunction_node> provider_; std::atomic calls_; - stores_t cache_; }; } diff --git a/phlex/core/declared_transform.cpp b/phlex/core/declared_transform.cpp index 83c34cd95..52bdd5197 100644 --- a/phlex/core/declared_transform.cpp +++ b/phlex/core/declared_transform.cpp @@ -1,8 +1,5 @@ #include "phlex/core/declared_transform.hpp" -#include "fmt/std.h" -#include "spdlog/spdlog.h" - namespace phlex::experimental { declared_transform::declared_transform(algorithm_name name, std::vector predicates, @@ -12,18 +9,4 @@ namespace phlex::experimental { } declared_transform::~declared_transform() = default; - - void declared_transform::report_cached_stores(stores_t const& stores) const - { - if (stores.size() > 0ull) { - spdlog::warn("Transform {} has {} cached stores.", full_name(), stores.size()); - } - for (auto const& [hash, store] : stores) { - if (not store) { - spdlog::warn("Store with hash {} is null!", hash); - continue; - } - spdlog::debug(" => ID: {} (hash: {})", store->index()->to_string(), hash); - } - } } diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index fdb04738b..4e9afe81d 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -8,9 +8,9 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/input_arguments.hpp" #include "phlex/core/message.hpp" +#include "phlex/core/multilayer_join_node.hpp" #include "phlex/core/product_query.hpp" #include "phlex/core/products_consumer.hpp" -#include "phlex/core/store_counters.hpp" #include "phlex/metaprogramming/type_deduction.hpp" #include "phlex/model/algorithm_name.hpp" #include "phlex/model/data_cell_index.hpp" @@ -19,7 +19,6 @@ #include "phlex/model/product_store.hpp" #include "phlex/utilities/simple_ptr_map.hpp" -#include "oneapi/tbb/concurrent_hash_map.h" #include "oneapi/tbb/concurrent_unordered_map.h" #include "oneapi/tbb/flow_graph.h" @@ -48,13 +47,6 @@ namespace phlex::experimental { virtual tbb::flow::sender& to_output() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; - - protected: - using stores_t = tbb::concurrent_hash_map; - using accessor = stores_t::accessor; - using const_accessor = stores_t::const_accessor; - - void report_cached_stores(stores_t const& stores) const; }; using declared_transform_ptr = std::unique_ptr; @@ -63,7 +55,7 @@ namespace phlex::experimental { // ===================================================================================== template - class transform_node : public declared_transform, private detect_flush_flag { + class transform_node : public declared_transform { using function_t = typename AlgorithmBits::bound_type; using input_parameter_types = typename AlgorithmBits::input_parameter_types; @@ -84,54 +76,42 @@ namespace phlex::experimental { declared_transform{std::move(name), std::move(predicates), std::move(input_products)}, output_{to_product_specifications( full_name(), std::move(output), make_output_type_ids())}, - join_{make_join_or_none(g, std::make_index_sequence{})}, + join_{make_join_or_none(g, full_name(), layers())}, transform_{g, concurrency, [this, ft = alg.release_algorithm()](messages_t const& messages, auto& output) { auto const& msg = most_derived(messages); auto const& [store, message_id] = std::tie(msg.store, msg.id); auto& [stay_in_graph, to_output] = output; - if (store->is_flush()) { - mark_flush_received(store->index()->hash(), msg.original_id); - stay_in_graph.try_put(msg); - to_output.try_put(msg); - } else { - accessor a; - if (stores_.insert(a, store->index()->hash())) { - auto result = call(ft, messages, std::make_index_sequence{}); - ++calls_; - ++product_count_[store->index()->layer_hash()]; - products new_products; - new_products.add_all(output_, std::move(result)); - a->second = std::make_shared( - store->index(), this->full_name(), std::move(new_products)); - - message const new_msg{a->second, message_id}; - stay_in_graph.try_put(new_msg); - to_output.try_put(new_msg); - mark_processed(store->index()->hash()); - } else { - stay_in_graph.try_put({a->second, message_id}); - } - } - - if (done_with(store)) { - stores_.erase(store->index()->hash()); - } + + auto result = call(ft, messages, std::make_index_sequence{}); + ++calls_; + ++product_count_[store->index()->layer_hash()]; + products new_products; + new_products.add_all(output_, std::move(result)); + auto new_store = std::make_shared( + store->index(), this->full_name(), std::move(new_products)); + + message const new_msg{std::move(new_store), message_id}; + stay_in_graph.try_put(new_msg); + to_output.try_put(new_msg); }} { - make_edge(join_, transform_); + if constexpr (N > 1ull) { + make_edge(join_, transform_); + } } - ~transform_node() { report_cached_stores(stores_); } - private: tbb::flow::receiver& port_for(product_query const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label, transform_); } - std::vector*> ports() override { return input_ports(join_); } + std::vector*> ports() override + { + return input_ports(join_, transform_); + } tbb::flow::sender& sender() override { return output_port<0>(transform_); } tbb::flow::sender& to_output() override { return output_port<1>(transform_); } @@ -140,9 +120,14 @@ namespace phlex::experimental { template auto call(function_t const& ft, messages_t const& messages, std::index_sequence) { - return std::invoke(ft, std::get(input_).retrieve(std::get(messages))...); + if constexpr (N == 1ull) { + return std::invoke(ft, std::get(input_).retrieve(messages)...); + } else { + return std::invoke(ft, std::get(input_).retrieve(std::get(messages))...); + } } + named_index_ports index_ports() final { return join_.index_ports(); } std::size_t num_calls() const final { return calls_.load(); } std::size_t product_count() const final { @@ -156,8 +141,7 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; product_specifications output_; join_or_none_t join_; - tbb::flow::multifunction_node, messages_t<2u>> transform_; - stores_t stores_; + tbb::flow::multifunction_node, message_tuple<2u>> transform_; std::atomic calls_; tbb::concurrent_unordered_map> product_count_; }; diff --git a/phlex/core/declared_unfold.cpp b/phlex/core/declared_unfold.cpp index 3339a105e..273431329 100644 --- a/phlex/core/declared_unfold.cpp +++ b/phlex/core/declared_unfold.cpp @@ -23,36 +23,22 @@ namespace phlex::experimental { return std::make_shared(child_index, node_name_, std::move(new_products)); } - product_store_const_ptr generator::flush_store() const + flush_counts_ptr generator::flush_result() const { - auto result = parent_->make_flush(); if (not child_counts_.empty()) { - result->add_product("[flush]", - std::make_shared(std::move(child_counts_))); + return std::make_shared(std::move(child_counts_)); } - return result; + return nullptr; } declared_unfold::declared_unfold(algorithm_name name, std::vector predicates, - product_queries input_products) : - products_consumer{std::move(name), std::move(predicates), std::move(input_products)} + product_queries input_products, + std::string child_layer) : + products_consumer{std::move(name), std::move(predicates), std::move(input_products)}, + child_layer_{std::move(child_layer)} { } declared_unfold::~declared_unfold() = default; - - void declared_unfold::report_cached_stores(stores_t const& stores) const - { - if (stores.size() > 0ull) { - spdlog::warn("Unfold {} has {} cached stores.", full_name(), stores.size()); - } - for (auto const& [hash, store] : stores) { - if (not store) { - spdlog::warn("Store with hash {} is null!", hash); - continue; - } - spdlog::debug(" => ID: {} (hash: {})", store->index()->to_string(), hash); - } - } } diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 967f76d0c..4b3212f92 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -5,8 +5,8 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/input_arguments.hpp" #include "phlex/core/message.hpp" +#include "phlex/core/multilayer_join_node.hpp" #include "phlex/core/products_consumer.hpp" -#include "phlex/core/store_counters.hpp" #include "phlex/model/algorithm_name.hpp" #include "phlex/model/data_cell_index.hpp" #include "phlex/model/handle.hpp" @@ -14,7 +14,6 @@ #include "phlex/model/product_store.hpp" #include "phlex/utilities/simple_ptr_map.hpp" -#include "oneapi/tbb/concurrent_hash_map.h" #include "oneapi/tbb/flow_graph.h" #include @@ -37,7 +36,7 @@ namespace phlex::experimental { explicit generator(product_store_const_ptr const& parent, std::string node_name, std::string const& child_layer_name); - product_store_const_ptr flush_store() const; + flush_counts_ptr flush_result() const; product_store_const_ptr make_child_for(std::size_t const data_cell_number, products new_products) @@ -57,20 +56,21 @@ namespace phlex::experimental { public: declared_unfold(algorithm_name name, std::vector predicates, - product_queries input_products); + product_queries input_products, + std::string child_layer); virtual ~declared_unfold(); virtual tbb::flow::sender& sender() = 0; + virtual tbb::flow::sender& output_index_port() = 0; virtual tbb::flow::sender& to_output() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; + virtual flusher_t& flusher() = 0; - protected: - using stores_t = tbb::concurrent_hash_map; - using accessor = stores_t::accessor; - using const_accessor = stores_t::const_accessor; + std::string const& child_layer() const noexcept { return child_layer_; } - void report_cached_stores(stores_t const& stores) const; + private: + std::string child_layer_; }; using declared_unfold_ptr = std::unique_ptr; @@ -79,7 +79,7 @@ namespace phlex::experimental { // ===================================================================================== template - class unfold_node : public declared_unfold, private detect_flush_flag { + class unfold_node : public declared_unfold { using InputArgs = constructor_parameter_types; static constexpr std::size_t N = std::tuple_size_v; static constexpr std::size_t M = number_output_objects; @@ -94,52 +94,52 @@ namespace phlex::experimental { product_queries product_labels, std::vector output_products, std::string child_layer_name) : - declared_unfold{std::move(name), std::move(predicates), std::move(product_labels)}, + declared_unfold{std::move(name), + std::move(predicates), + std::move(product_labels), + std::move(child_layer_name)}, output_{to_product_specifications(full_name(), std::move(output_products), make_type_ids>>())}, - child_layer_name_{std::move(child_layer_name)}, - join_{make_join_or_none(g, std::make_index_sequence{})}, + join_{make_join_or_none(g, full_name(), layers())}, unfold_{g, concurrency, [this, p = std::move(predicate), ufold = std::move(unfold)]( - messages_t const& messages, auto& output) { + messages_t const& messages, auto&) { auto const& msg = most_derived(messages); auto const& store = msg.store; - if (store->is_flush()) { - mark_flush_received(store->index()->hash(), msg.id); - std::get<0>(output).try_put(msg); - } else if (accessor a; stores_.insert(a, store->index()->hash())) { - std::size_t const original_message_id{msg_counter_}; - generator g{msg.store, this->full_name(), child_layer_name_}; - call(p, ufold, msg.store->index(), g, messages, std::make_index_sequence{}); - - message const flush_msg{ - g.flush_store(), msg_counter_.fetch_add(1), original_message_id}; - std::get<0>(output).try_put(flush_msg); - mark_processed(store->index()->hash()); - } - - if (done_with(store)) { - stores_.erase(store->index()->hash()); - } - }} + + std::size_t const original_message_id{msg_counter_}; + generator g{store, this->full_name(), child_layer()}; + call(p, ufold, store->index(), g, messages, std::make_index_sequence{}); + + flusher_.try_put({store->index(), g.flush_result(), original_message_id}); + }}, + flusher_{g} { - make_edge(join_, unfold_); + if constexpr (N > 1ull) { + make_edge(join_, unfold_); + } } - ~unfold_node() { report_cached_stores(stores_); } - private: tbb::flow::receiver& port_for(product_query const& product_label) override { - return receiver_for(join_, input(), product_label); + return receiver_for(join_, input(), product_label, unfold_); + } + std::vector*> ports() override + { + return input_ports(join_, unfold_); } - std::vector*> ports() override { return input_ports(join_); } tbb::flow::sender& sender() override { return output_port<0>(unfold_); } + tbb::flow::sender& output_index_port() override + { + return output_port<1>(unfold_); + } tbb::flow::sender& to_output() override { return sender(); } product_specifications const& output() const override { return output_; } + flusher_t& flusher() override { return flusher_; } template void call(Predicate const& predicate, @@ -150,12 +150,18 @@ namespace phlex::experimental { std::index_sequence) { ++calls_; - Object obj(std::get(input_).retrieve(std::get(messages))...); + Object obj = [this, &messages]() { + if constexpr (N == 1ull) { + return Object(std::get(input_).retrieve(messages)...); + } else { + return Object(std::get(input_).retrieve(std::get(messages))...); + } + }(); std::size_t counter = 0; auto running_value = obj.initial_value(); while (std::invoke(predicate, obj, running_value)) { products new_products; - auto new_id = unfolded_id->make_child(counter, child_layer_name_); + auto new_id = unfolded_id->make_child(counter, child_layer()); if constexpr (requires { std::invoke(unfold, obj, running_value, *new_id); }) { auto [next_value, prods] = std::invoke(unfold, obj, running_value, *new_id); new_products.add_all(output_, std::move(prods)); @@ -169,22 +175,22 @@ namespace phlex::experimental { auto child = g.make_child_for(counter++, std::move(new_products)); message const child_msg{child, msg_counter_.fetch_add(1)}; output_port<0>(unfold_).try_put(child_msg); + output_port<1>(unfold_).try_put(child->index()); // Every data cell needs a flush (for now) - message const child_flush_msg{child->make_flush(), msg_counter_.fetch_add(1)}; - output_port<0>(unfold_).try_put(child_flush_msg); + flusher_.try_put({child->index(), nullptr, -1ull}); } } + named_index_ports index_ports() final { return join_.index_ports(); } std::size_t num_calls() const final { return calls_.load(); } std::size_t product_count() const final { return product_count_.load(); } input_retriever_types input_{input_arguments()}; product_specifications output_; - std::string child_layer_name_; join_or_none_t join_; - tbb::flow::multifunction_node, messages_t<1u>> unfold_; - tbb::concurrent_hash_map stores_; + tbb::flow::multifunction_node, std::tuple> unfold_; + flusher_t flusher_; std::atomic msg_counter_{}; // Is this sufficient? Probably not. std::atomic calls_{}; std::atomic product_count_{}; diff --git a/phlex/core/detail/repeater_node.cpp b/phlex/core/detail/repeater_node.cpp new file mode 100644 index 000000000..4226cc777 --- /dev/null +++ b/phlex/core/detail/repeater_node.cpp @@ -0,0 +1,166 @@ +#include "phlex/core/detail/repeater_node.hpp" + +#include "spdlog/spdlog.h" + +#include + +namespace phlex::experimental::detail { + + repeater_node::repeater_node(tbb::flow::graph& g, std::string node_name, std::string layer_name) : + base_t{g}, + indexer_{g}, + repeater_{g, + tbb::flow::unlimited, + [this](tagged_msg_t const& tagged, auto& /* outputs */) { + std::size_t key = -1ull; + if (tagged.is_a()) { + key = handle_data_message(tagged.cast_to()); + } else if (tagged.is_a()) { + key = handle_flush_token(tagged.cast_to()); + } else { + key = handle_index_message(tagged.cast_to()); + } + + cleanup_cache_entry(key); + }}, + node_name_{std::move(node_name)}, + layer_{std::move(layer_name)} + { + base_t::set_external_ports(base_t::input_ports_type{input_port<0>(indexer_), + input_port<1>(indexer_), + input_port<2>(indexer_)}, + base_t::output_ports_type{output_port<0>(repeater_)}); + make_edge(indexer_, repeater_); + } + + tbb::flow::receiver& repeater_node::data_port() { return input_port<0>(indexer_); } + + tbb::flow::receiver& repeater_node::flush_port() + { + return input_port<1>(indexer_); + } + + tbb::flow::receiver& repeater_node::index_port() + { + return input_port<2>(indexer_); + } + + repeater_node::~repeater_node() + { + if (cached_products_.empty()) { + return; + } + + spdlog::warn("[{}/{}] Cached messages: {}", node_name_, layer_, cached_products_.size()); + for (auto const& [_, cache] : cached_products_) { + if (cache.data_msg) { + spdlog::warn("[{}/{}] Product for {}", + node_name_, + layer_, + cache.data_msg->store->index()->to_string()); + } else { + spdlog::warn("[{}/{}] Product not yet received", node_name_, layer_); + } + } + } + + int repeater_node::emit_pending_ids(cached_product* entry) + { + assert(entry->data_msg); + int num_emitted{}; + std::size_t msg_id{}; + while (entry->msg_ids.try_pop(msg_id)) { + output_port<0>(repeater_).try_put({.store = entry->data_msg->store, .id = msg_id}); + ++num_emitted; + } + return num_emitted; + } + + std::size_t repeater_node::handle_data_message(message const& msg) + { + auto const key = msg.store->index()->hash(); + + // Pass-through mode; output directly without caching + if (!cache_enabled_) { + output_port<0>(repeater_).try_put(msg); + return key; + } + + // Caching mode; store product and drain any pending message IDs + assert(msg.store); + accessor a; + cached_products_.insert(a, key); + auto* entry = &a->second; + entry->data_msg = std::make_shared(msg); + entry->counter += emit_pending_ids(entry); + return key; + } + + std::size_t repeater_node::handle_flush_token(indexed_end_token const& token) + { + auto const& [index, count] = token; + auto const key = index->hash(); + accessor a; + cached_products_.insert(a, key); + auto* entry = &a->second; + entry->counter -= count; + std::ignore = entry->flush_received.test_and_set(); + return key; + } + + std::size_t repeater_node::handle_index_message(index_message const& msg) + { + auto const& [index, msg_id, cache] = msg; + auto const key = index->hash(); + + // Caching already disabled; no action needed + if (!cache_enabled_) { + return key; + } + + // Transition to pass-through mode; output any cached product and disable caching + if (!cache) { + cache_enabled_ = false; + if (accessor a; cached_products_.find(a, key)) { + auto* entry = &a->second; + if (entry->data_msg) { + output_port<0>(repeater_).try_put(*entry->data_msg); + ++entry->counter; + } + } + return key; + } + + // Normal caching mode; either output cached product or queue message ID for later + accessor a; + cached_products_.insert(a, key); + auto* entry = &a->second; + if (entry->data_msg) { + output_port<0>(repeater_).try_put({.store = entry->data_msg->store, .id = msg_id}); + entry->counter += 1 + emit_pending_ids(entry); + } else { + entry->msg_ids.push(msg_id); + } + return key; + } + + void repeater_node::cleanup_cache_entry(std::size_t key) + { + accessor a; + if (!cached_products_.find(a, key)) { + return; + } + + auto* entry = &a->second; + if (!cache_enabled_) { + if (entry->counter == 0) { + assert(entry->data_msg); + output_port<0>(repeater_).try_put(*entry->data_msg); + } + cached_products_.erase(a); + } else if (entry->flush_received.test() and entry->counter == 0) { + cached_products_.erase(a); + } + } + +} diff --git a/phlex/core/detail/repeater_node.hpp b/phlex/core/detail/repeater_node.hpp new file mode 100644 index 000000000..be8b07ceb --- /dev/null +++ b/phlex/core/detail/repeater_node.hpp @@ -0,0 +1,59 @@ +#ifndef PHLEX_CORE_DETAIL_REPEATER_NODE_HPP +#define PHLEX_CORE_DETAIL_REPEATER_NODE_HPP + +#include "phlex/core/message.hpp" + +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_queue.h" +#include "oneapi/tbb/flow_graph.h" + +#include +#include +#include + +namespace phlex::experimental::detail { + + using repeater_node_input = std::tuple; + + class repeater_node : public tbb::flow::composite_node> { + public: + repeater_node(tbb::flow::graph& g, std::string node_name, std::string layer_name); + + tbb::flow::receiver& data_port(); + tbb::flow::receiver& flush_port(); + tbb::flow::receiver& index_port(); + + ~repeater_node(); + + private: + using base_t = tbb::flow::composite_node>; + using tagged_msg_t = + tbb::flow::tagged_msg; + using multifunction_node_t = tbb::flow::multifunction_node>; + + struct cached_product { + std::shared_ptr data_msg; + tbb::concurrent_queue msg_ids{}; + std::atomic counter; + std::atomic_flag flush_received{}; + }; + + using cache_t = tbb::concurrent_hash_map; // Key is the index hash + using accessor = cache_t::accessor; + + int emit_pending_ids(cached_product* entry); + std::size_t handle_data_message(message const& msg); + std::size_t handle_flush_token(indexed_end_token const& token); + std::size_t handle_index_message(index_message const& msg); + void cleanup_cache_entry(std::size_t key); + + tbb::flow::indexer_node indexer_; + multifunction_node_t repeater_; + cache_t cached_products_; + std::atomic cache_enabled_{true}; + std::string node_name_; + std::string layer_; + }; +} + +#endif // PHLEX_CORE_DETAIL_REPEATER_NODE_HPP diff --git a/phlex/core/edge_maker.cpp b/phlex/core/edge_maker.cpp index 9f9e3362c..5f38da496 100644 --- a/phlex/core/edge_maker.cpp +++ b/phlex/core/edge_maker.cpp @@ -6,12 +6,12 @@ #include namespace phlex::experimental { - multiplexer::input_ports_t make_provider_edges(multiplexer::head_ports_t head_ports, - declared_providers& providers) + index_router::provider_input_ports_t make_provider_edges(index_router::head_ports_t head_ports, + declared_providers& providers) { assert(!head_ports.empty()); - multiplexer::input_ports_t result; + index_router::provider_input_ports_t result; for (auto const& [node_name, ports] : head_ports) { for (auto const& port : ports) { // Find the provider that has the right product name (hidden in the diff --git a/phlex/core/edge_maker.hpp b/phlex/core/edge_maker.hpp index 8aef9b260..ec7b7cc28 100644 --- a/phlex/core/edge_maker.hpp +++ b/phlex/core/edge_maker.hpp @@ -1,11 +1,12 @@ #ifndef PHLEX_CORE_EDGE_MAKER_HPP #define PHLEX_CORE_EDGE_MAKER_HPP +#include "phlex/core/declared_fold.hpp" #include "phlex/core/declared_output.hpp" #include "phlex/core/declared_provider.hpp" #include "phlex/core/edge_creation_policy.hpp" #include "phlex/core/filter.hpp" -#include "phlex/core/multiplexer.hpp" +#include "phlex/core/index_router.hpp" #include "oneapi/tbb/flow_graph.h" #include "spdlog/spdlog.h" @@ -24,8 +25,8 @@ namespace phlex::experimental { using product_name_t = std::string; - multiplexer::input_ports_t make_provider_edges(multiplexer::head_ports_t head_ports, - declared_providers& providers); + index_router::provider_input_ports_t make_provider_edges(index_router::head_ports_t head_ports, + declared_providers& providers); class edge_maker { public: @@ -33,8 +34,8 @@ namespace phlex::experimental { edge_maker(Args&... args); template - void operator()(tbb::flow::input_node& source, - multiplexer& multi, + void operator()(tbb::flow::graph& g, + index_router& multi, std::map& filters, declared_outputs& outputs, declared_providers& providers, @@ -42,7 +43,10 @@ namespace phlex::experimental { private: template - multiplexer::head_ports_t edges(std::map& filters, T& consumers); + index_router::head_ports_t edges(std::map& filters, T& consumers); + + template + std::map multilayer_ports(T& consumers); edge_creation_policy producers_; }; @@ -55,9 +59,9 @@ namespace phlex::experimental { } template - multiplexer::head_ports_t edge_maker::edges(std::map& filters, T& consumers) + index_router::head_ports_t edge_maker::edges(std::map& filters, T& consumers) { - multiplexer::head_ports_t result; + index_router::head_ports_t result; for (auto& [node_name, node] : consumers) { tbb::flow::receiver* collector = nullptr; if (auto coll_it = filters.find(node_name); coll_it != cend(filters)) { @@ -79,16 +83,31 @@ namespace phlex::experimental { return result; } + template + std::map edge_maker::multilayer_ports(T& consumers) + { + // Folds are not yet supported with the new caching system + if constexpr (std::same_as) { + return {}; + } else { + std::map result; + for (auto& [node_name, node] : consumers) { + if (auto const& ports = node->index_ports(); not ports.empty()) { + result.try_emplace(node_name, ports); + } + } + return result; + } + } + template - void edge_maker::operator()(tbb::flow::input_node& source, - multiplexer& multi, + void edge_maker::operator()(tbb::flow::graph& g, + index_router& multi, std::map& filters, declared_outputs& outputs, declared_providers& providers, Args&... consumers) { - make_edge(source, multi); - // Create edges to outputs for (auto const& [output_name, output_node] : outputs) { for (auto& [_, provider] : providers) { @@ -100,7 +119,7 @@ namespace phlex::experimental { } // Create normal edges - multiplexer::head_ports_t head_ports; + index_router::head_ports_t head_ports; (head_ports.merge(edges(filters, consumers)), ...); // Eventually, we want to look at the filled-in head_ports and // figure out what provider nodes are needed. @@ -112,7 +131,11 @@ namespace phlex::experimental { } auto provider_input_ports = make_provider_edges(std::move(head_ports), providers); - multi.finalize(std::move(provider_input_ports)); + + std::map multilayer_join_index_ports; + (multilayer_join_index_ports.merge(multilayer_ports(consumers)), ...); + + multi.finalize(g, std::move(provider_input_ports), std::move(multilayer_join_index_ports)); } } diff --git a/phlex/core/filter.cpp b/phlex/core/filter.cpp index 9ecd5063a..e2de52d87 100644 --- a/phlex/core/filter.cpp +++ b/phlex/core/filter.cpp @@ -44,13 +44,6 @@ namespace phlex::experimental { unsigned int msg_id{}; if (t.is_a()) { auto const& msg = t.cast_to(); - if (msg.store->is_flush()) { - // All flush messages are automatically forwarded to downstream ports. - for (std::size_t i = 0ull; i != nargs_; ++i) { - downstream_ports_[i]->try_put(msg); - } - return {}; - } msg_id = msg.id; data_.update(msg.id, msg.store); } else { diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index 071b017ef..9d70909ca 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -13,61 +13,33 @@ #include namespace phlex::experimental { - layer_sentry::layer_sentry(flush_counters& counters, - message_sender& sender, - product_store_ptr store) : - counters_{counters}, sender_{sender}, store_{store}, depth_{store_->index()->depth()} - { - counters_.update(store_->index()); - } - - layer_sentry::~layer_sentry() - { - // To consider: We may want to skip the following logic if the framework prematurely - // needs to shut down. Keeping it enabled allows in-flight folds to - // complete. However, in some cases it may not be desirable to do this. - auto flush_result = counters_.extract(store_->index()); - auto flush_store = store_->make_flush(); - if (not flush_result.empty()) { - flush_store->add_product("[flush]", - std::make_shared(std::move(flush_result))); - } - sender_.send_flush(std::move(flush_store)); - } - - std::size_t layer_sentry::depth() const noexcept { return depth_; } - framework_graph::framework_graph(data_cell_index_ptr index, int const max_parallelism) : framework_graph{[index](framework_driver& driver) { driver.yield(index); }, max_parallelism} { } - // FIXME: The algorithm below should support user-specified flush stores. framework_graph::framework_graph(detail::next_index_t next_index, int const max_parallelism) : parallelism_limit_{static_cast(max_parallelism)}, driver_{std::move(next_index)}, src_{graph_, - [this](tbb::flow_control& fc) mutable -> message { + [this](tbb::flow_control& fc) mutable -> data_cell_index_ptr { auto item = driver_(); if (not item) { - drain(); + index_router_.drain(); fc.stop(); return {}; } - auto index = *item; - auto store = std::make_shared(index, "Source"); - return sender_.make_message(accept(std::move(store))); + + return index_router_.route(*item); }}, - multiplexer_{graph_}, - hierarchy_node_{ - graph_, tbb::flow::unlimited, [this](message const& msg) -> tbb::flow::continue_msg { - if (not msg.store->is_flush()) { - hierarchy_.increment_count(msg.store->index()); - } - return {}; - }} + index_router_{graph_}, + hierarchy_node_{graph_, + tbb::flow::unlimited, + [this](data_cell_index_ptr const& index) -> tbb::flow::continue_msg { + hierarchy_.increment_count(index); + return {}; + }} { - // FIXME: Should the loading of env levels happen in the phlex app only? spdlog::cfg::load_env_levels(); spdlog::info("Number of worker threads: {}", max_allowed_parallelism::active_value()); } @@ -76,9 +48,7 @@ namespace phlex::experimental { { if (shutdown_on_error_) { // When in an error state, we need to sanely pop the layer stack and wait for any tasks to finish. - while (!layers_.empty()) { - layers_.pop(); - } + index_router_.drain(); graph_.wait_for_all(); } } @@ -161,8 +131,8 @@ namespace phlex::experimental { filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.transforms)); edge_maker make_edges{nodes_.transforms, nodes_.folds, nodes_.unfolds}; - make_edges(src_, - multiplexer_, + make_edges(graph_, + index_router_, filters_, nodes_.outputs, nodes_.providers, @@ -172,30 +142,38 @@ namespace phlex::experimental { nodes_.unfolds, nodes_.transforms); + std::map flushers_from_unfolds; + for (auto const& n : nodes_.unfolds | std::views::values) { + flushers_from_unfolds.try_emplace(n->child_layer(), &n->flusher()); + } + + // Connect edges between all nodes, the graph-wide flusher, and the unfolds' flushers + auto connect_with_flusher = + [this, unfold_flushers = std::move(flushers_from_unfolds)](auto& consumers) { + for (auto& n : consumers | std::views::values) { + std::set flushers; + // For providers + for (product_query const& pq : n->input()) { + if (auto it = unfold_flushers.find(pq.layer()); it != unfold_flushers.end()) { + flushers.insert(it->second); + } else { + flushers.insert(&index_router_.flusher()); + } + } + for (flusher_t* flusher : flushers) { + make_edge(*flusher, n->flush_port()); + } + } + }; + connect_with_flusher(nodes_.folds); + // The hierarchy node is used to report which data layers have been seen by the // framework. To assemble the report, data-cell indices emitted by the input node are // recorded as well as any data-cell indices emitted by an unfold. make_edge(src_, hierarchy_node_); for (auto& [_, node] : nodes_.unfolds) { - make_edge(node->sender(), hierarchy_node_); - } - } - - product_store_ptr framework_graph::accept(product_store_ptr store) - { - assert(store); - auto const new_depth = store->index()->depth(); - while (not empty(layers_) and new_depth <= layers_.top().depth()) { - layers_.pop(); + make_edge(node->output_index_port(), hierarchy_node_); } - layers_.emplace(counters_, sender_, store); - return store; } - void framework_graph::drain() - { - while (not empty(layers_)) { - layers_.pop(); - } - } } diff --git a/phlex/core/framework_graph.hpp b/phlex/core/framework_graph.hpp index 6dc8b7e73..49a1a6c04 100644 --- a/phlex/core/framework_graph.hpp +++ b/phlex/core/framework_graph.hpp @@ -5,9 +5,8 @@ #include "phlex/core/declared_unfold.hpp" #include "phlex/core/filter.hpp" #include "phlex/core/glue.hpp" +#include "phlex/core/index_router.hpp" #include "phlex/core/message.hpp" -#include "phlex/core/message_sender.hpp" -#include "phlex/core/multiplexer.hpp" #include "phlex/core/node_catalog.hpp" #include "phlex/driver.hpp" #include "phlex/model/data_layer_hierarchy.hpp" @@ -22,8 +21,6 @@ #include #include -#include -#include #include #include #include @@ -34,19 +31,6 @@ namespace phlex { } namespace phlex::experimental { - class layer_sentry { - public: - layer_sentry(flush_counters& counters, message_sender& sender, product_store_ptr store); - ~layer_sentry(); - std::size_t depth() const noexcept; - - private: - flush_counters& counters_; - message_sender& sender_; - product_store_ptr store_; - std::size_t depth_; - }; - class framework_graph { public: explicit framework_graph(data_cell_index_ptr index, @@ -165,10 +149,6 @@ namespace phlex::experimental { void run(); void finalize(); - product_store_ptr accept(product_store_ptr store); - void drain(); - std::size_t original_message_id(product_store_ptr const& store); - resource_usage graph_resource_usage_{}; max_allowed_parallelism parallelism_limit_; data_layer_hierarchy hierarchy_{}; @@ -178,13 +158,9 @@ namespace phlex::experimental { tbb::flow::graph graph_{}; framework_driver driver_; std::vector registration_errors_{}; - tbb::flow::input_node src_; - multiplexer multiplexer_; - tbb::flow::function_node hierarchy_node_; - message_sender sender_{multiplexer_}; - std::queue pending_stores_; - flush_counters counters_; - std::stack layers_; + tbb::flow::input_node src_; + index_router index_router_; + tbb::flow::function_node hierarchy_node_; bool shutdown_on_error_{false}; }; } diff --git a/phlex/core/fwd.hpp b/phlex/core/fwd.hpp index 556eff89c..076f4073f 100644 --- a/phlex/core/fwd.hpp +++ b/phlex/core/fwd.hpp @@ -3,19 +3,19 @@ #include "phlex/model/fwd.hpp" +#include "oneapi/tbb/flow_graph.h" + namespace phlex::experimental { - class component; class consumer; class declared_output; class generator; + struct flush_message; class framework_graph; - class message_sender; - class multiplexer; + struct message; + class index_router; class products_consumer; + + using flusher_t = tbb::flow::broadcast_node; } #endif // PHLEX_CORE_FWD_HPP - -// Local Variables: -// mode: c++ -// End: diff --git a/phlex/core/index_router.cpp b/phlex/core/index_router.cpp new file mode 100644 index 000000000..37214f29a --- /dev/null +++ b/phlex/core/index_router.cpp @@ -0,0 +1,284 @@ +#include "phlex/core/index_router.hpp" +#include "phlex/model/product_store.hpp" + +#include "fmt/std.h" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include + +using namespace phlex::experimental; + +namespace { + auto delimited_layer_path(std::string layer_path) + { + if (not layer_path.starts_with("/")) { + return "/" + layer_path; + } + return layer_path; + } + + void send_messages(phlex::data_cell_index_ptr const& index, + std::size_t message_id, + phlex::experimental::detail::multilayer_slots const& slots) + { + for (auto& slot : slots) { + slot->put_message(index, message_id); + } + } +} + +namespace phlex::experimental { + + //======================================================================================== + // layer_scope implementation + + detail::layer_scope::layer_scope(flush_counters& counters, + flusher_t& flusher, + detail::multilayer_slots const& slots_for_layer, + data_cell_index_ptr index, + std::size_t const message_id) : + counters_{counters}, + flusher_{flusher}, + slots_{slots_for_layer}, + index_{index}, + message_id_{message_id} + { + // FIXME: Only for folds right now + counters_.update(index_); + } + + detail::layer_scope::~layer_scope() + { + // To consider: We may want to skip the following logic if the framework prematurely + // needs to shut down. Keeping it enabled allows in-flight folds to + // complete. However, in some cases it may not be desirable to do this. + + for (auto& slot : slots_) { + slot->put_end_token(index_); + } + + // The following is for fold nodes only (temporary until the release of fold results are incorporated + // into the above paradigm). + auto flush_result = counters_.extract(index_); + flush_counts_ptr result; + if (not flush_result.empty()) { + result = std::make_shared(std::move(flush_result)); + } + flusher_.try_put({index_, std::move(result), message_id_}); + } + + std::size_t detail::layer_scope::depth() const { return index_->depth(); } + + //======================================================================================== + // multilayer_slot implementation + + detail::multilayer_slot::multilayer_slot(tbb::flow::graph& g, + std::string layer, + tbb::flow::receiver* flush_port, + tbb::flow::receiver* input_port) : + layer_{std::move(layer)}, broadcaster_{g}, flusher_{g} + { + make_edge(broadcaster_, *input_port); + make_edge(flusher_, *flush_port); + } + + void detail::multilayer_slot::put_message(data_cell_index_ptr const& index, + std::size_t message_id) + { + if (layer_ == index->layer_name()) { + broadcaster_.try_put({.index = index, .msg_id = message_id, .cache = false}); + return; + } + + // Flush values are only used for indices that are *not* the "lowest" in the branch + // of the hierarchy. + ++counter_; + broadcaster_.try_put({.index = index->parent(layer_), .msg_id = message_id}); + } + + void detail::multilayer_slot::put_end_token(data_cell_index_ptr const& index) + { + auto count = std::exchange(counter_, 0); + if (count == 0) { + // See comment above about flush values + return; + } + + flusher_.try_put({.index = index, .count = count}); + } + + bool detail::multilayer_slot::matches_exactly(std::string const& layer_path) const + { + return layer_path.ends_with(delimited_layer_path(layer_)); + } + + bool detail::multilayer_slot::is_parent_of(data_cell_index_ptr const& index) const + { + return index->parent(layer_) != nullptr; + } + + //======================================================================================== + // index_router implementation + + index_router::index_router(tbb::flow::graph& g) : flusher_{g} {} + + void index_router::finalize(tbb::flow::graph& g, + provider_input_ports_t provider_input_ports, + std::map multilayers) + { + // We must have at least one provider port, or there can be no data to process. + assert(!provider_input_ports.empty()); + provider_input_ports_ = std::move(provider_input_ports); + + // Create the index-set broadcast nodes for providers + for (auto& [pq, provider_port] : provider_input_ports_ | std::views::values) { + auto [it, _] = + broadcasters_.try_emplace(pq.layer(), std::make_shared(g)); + make_edge(*it->second, *provider_port); + } + + for (auto const& [node_name, multilayer] : multilayers) { + spdlog::trace("Making multilayer caster for {}", node_name); + detail::multilayer_slots casters; + casters.reserve(multilayer.size()); + // FIXME: Consider whether the construction of casters can be simplied + for (auto const& [layer, flush_port, input_port] : multilayer) { + auto entry = std::make_shared(g, layer, flush_port, input_port); + casters.push_back(entry); + } + multibroadcasters_.try_emplace(node_name, std::move(casters)); + } + } + + data_cell_index_ptr index_router::route(data_cell_index_ptr const index) + { + backout_to(index); + + auto message_id = received_indices_.fetch_add(1); + + send_to_provider_index_nodes(index, message_id); + auto const& slots_for_layer = send_to_multilayer_join_nodes(index, message_id); + + layers_.emplace(counters_, flusher_, slots_for_layer, index, message_id); + + return index; + } + + void index_router::backout_to(data_cell_index_ptr const index) + { + assert(index); + auto const new_depth = index->depth(); + while (not empty(layers_) and new_depth <= layers_.top().depth()) { + layers_.pop(); + } + } + + void index_router::drain() + { + while (not empty(layers_)) { + layers_.pop(); + } + } + + void index_router::send_to_provider_index_nodes(data_cell_index_ptr const& index, + std::size_t const message_id) + { + if (auto it = matched_broadcasters_.find(index->layer_hash()); + it != matched_broadcasters_.end()) { + // Not all layers will have a corresponding broadcaster + if (it->second) { + it->second->try_put({.index = index, .msg_id = message_id}); + } + return; + } + + auto broadcaster = index_node_for(index->layer_name()); + if (broadcaster) { + broadcaster->try_put({.index = index, .msg_id = message_id}); + } + // We cache the result of the lookup even if there is no broadcaster for this layer, + // to avoid repeated lookups for layers that don't have broadcasters. + matched_broadcasters_.try_emplace(index->layer_hash(), broadcaster); + } + + detail::multilayer_slots const& index_router::send_to_multilayer_join_nodes( + data_cell_index_ptr const& index, std::size_t const message_id) + { + auto const layer_hash = index->layer_hash(); + + if (auto it = matched_routing_entries_.find(layer_hash); it != matched_routing_entries_.end()) { + send_messages(index, message_id, it->second); + return matched_flushing_entries_.find(layer_hash)->second; + } + + auto [routing_it, _] = matched_routing_entries_.try_emplace(layer_hash); + auto [flushing_it, __] = matched_flushing_entries_.try_emplace(layer_hash); + + auto const layer_path = index->layer_path(); + + // For each multi-layer join node, determine which slots are relevant to this index. + // Routing entries: All slots from a node are added if (1) at least one slot exactly + // matches the current layer, and (2) all slots either exactly match + // or are parent layers of the current index. + // Flushing entries: Only slots that exactly match the current layer are added. + for (auto& [node_name, slots] : multibroadcasters_) { + detail::multilayer_slots matching_slots; + matching_slots.reserve(slots.size()); + + bool has_exact_match = false; + std::size_t matched_count = 0; + + for (auto& slot : slots) { + if (slot->matches_exactly(layer_path)) { + has_exact_match = true; + flushing_it->second.push_back(slot); + matching_slots.push_back(slot); + ++matched_count; + } else if (slot->is_parent_of(index)) { + matching_slots.push_back(slot); + ++matched_count; + } + } + + // Add all matching slots to routing entries only if we have an exact match and + // all slots from this node matched something (either exactly or as a parent). + if (has_exact_match and matched_count == slots.size()) { + routing_it->second.insert(routing_it->second.end(), + std::make_move_iterator(matching_slots.begin()), + std::make_move_iterator(matching_slots.end())); + } + } + send_messages(index, message_id, routing_it->second); + return flushing_it->second; + } + + auto index_router::index_node_for(std::string const& layer_path) -> detail::index_set_node_ptr + { + std::string const search_token = delimited_layer_path(layer_path); + + std::vector candidates; + for (auto it = broadcasters_.begin(), e = broadcasters_.end(); it != e; ++it) { + if (search_token.ends_with(delimited_layer_path(it->first))) { + candidates.push_back(it); + } + } + + if (candidates.size() == 1ull) { + return candidates[0]->second; + } + + if (candidates.empty()) { + return nullptr; + } + + std::string msg{"Multiple layers match specification " + layer_path + ":\n"}; + for (auto const& it : candidates) { + msg += "\n- " + it->first; + } + throw std::runtime_error(msg); + } +} diff --git a/phlex/core/index_router.hpp b/phlex/core/index_router.hpp new file mode 100644 index 000000000..f52ce1a54 --- /dev/null +++ b/phlex/core/index_router.hpp @@ -0,0 +1,139 @@ +#ifndef PHLEX_CORE_INDEX_ROUTER_HPP +#define PHLEX_CORE_INDEX_ROUTER_HPP + +#include "phlex/core/fwd.hpp" +#include "phlex/core/message.hpp" +#include "phlex/model/data_cell_counter.hpp" +#include "phlex/model/data_cell_index.hpp" + +#include "oneapi/tbb/flow_graph.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace phlex::experimental { + namespace detail { + using index_set_node = tbb::flow::broadcast_node; + using index_set_node_ptr = std::shared_ptr; + using flush_node = tbb::flow::broadcast_node; + + // ========================================================================================== + // A multilayer_slot manages routing and flushing for a single layer slot (a repeater) in + // multi-layer join nodes. Each slot corresponds to one input data layer of a multi-layer + // join operation. It: + // (a) routes index messages to either the matching layer or its data-layer parent, and + // (b) emits flush tokens to the repeater to evict a cached data product from memory. + class multilayer_slot { + public: + multilayer_slot(tbb::flow::graph& g, + std::string layer, + tbb::flow::receiver* flush_port, + tbb::flow::receiver* input_port); + + void put_message(data_cell_index_ptr const& index, std::size_t message_id); + void put_end_token(data_cell_index_ptr const& index); + + bool matches_exactly(std::string const& layer_path) const; + bool is_parent_of(data_cell_index_ptr const& index) const; + + private: + std::string layer_; + detail::index_set_node broadcaster_; + detail::flush_node flusher_; + int counter_ = 0; + }; + + using multilayer_slots = std::vector>; + + // A layer_scope object is an RAII object that manages layer-scoped operations during + // data-cell-index routing. It updates flush counters on construction and ensures cleanup + // (flushing end tokens and releasing fold results) on destruction. + class layer_scope { + public: + layer_scope(flush_counters& counters, + flusher_t& flusher, + multilayer_slots const& slots_for_layer, + data_cell_index_ptr index, + std::size_t message_id); + ~layer_scope(); + std::size_t depth() const; + + private: + flush_counters& counters_; + flusher_t& flusher_; + multilayer_slots const& slots_; + data_cell_index_ptr index_; + std::size_t message_id_; + }; + } + + class index_router { + public: + struct named_input_port { + product_query product_label; + tbb::flow::receiver* port; + }; + using named_input_ports_t = std::vector; + + // map of node name to its input ports + using head_ports_t = std::map; + + struct provider_input_port_t { + product_query product_label; + tbb::flow::receiver* port; + }; + using provider_input_ports_t = std::map; + + explicit index_router(tbb::flow::graph& g); + data_cell_index_ptr route(data_cell_index_ptr index); + + void finalize(tbb::flow::graph& g, + provider_input_ports_t provider_input_ports, + std::map multilayers); + void drain(); + flusher_t& flusher() { return flusher_; } + + private: + void backout_to(data_cell_index_ptr store); + void send_to_provider_index_nodes(data_cell_index_ptr const& index, std::size_t message_id); + detail::multilayer_slots const& send_to_multilayer_join_nodes(data_cell_index_ptr const& index, + std::size_t message_id); + detail::index_set_node_ptr index_node_for(std::string const& layer); + + provider_input_ports_t provider_input_ports_; + std::atomic received_indices_{}; + flusher_t flusher_; + flush_counters counters_; + std::stack layers_; + + // ========================================================================================== + // Routing to provider nodes + // The following maps are used to route data-cell indices to provider nodes. + // The first map is from layer name to the corresponding broadcaster node. + std::unordered_map broadcasters_; + // The second map is a cache from a layer hash matched to a broadcaster node, to avoid + // repeated lookups for the same layer. + std::unordered_map matched_broadcasters_; + + // ========================================================================================== + // Routing to multi-layer join nodes + // The first map is from the node name to the corresponding broadcaster nodes and flush + // nodes. + std::unordered_map multibroadcasters_; + // The second map is a cache from a layer hash matched to a set of multilayer slots, to + // avoid repeated lookups for the same layer. + std::unordered_map matched_routing_entries_; + // The third map is a cache from a layer hash matched to a set of multilayer slots for the + // purposes of flushing, to avoid repeated lookups for the same layer during flushing. + std::unordered_map matched_flushing_entries_; + }; + +} + +#endif // PHLEX_CORE_INDEX_ROUTER_HPP diff --git a/phlex/core/message.cpp b/phlex/core/message.cpp index be416d999..1378b60f0 100644 --- a/phlex/core/message.cpp +++ b/phlex/core/message.cpp @@ -8,7 +8,7 @@ namespace phlex::experimental { - std::size_t MessageHasher::operator()(message const& msg) const noexcept { return msg.id; } + std::size_t message_matcher::operator()(message const& msg) const noexcept { return msg.id; } message const& more_derived(message const& a, message const& b) { @@ -31,9 +31,4 @@ namespace phlex::experimental { } return std::distance(b, it); } - - detail::no_join::no_join(tbb::flow::graph& g, MessageHasher) : - no_join_base_t{g, tbb::flow::unlimited, [](message const& msg) { return std::tuple{msg}; }} - { - } } diff --git a/phlex/core/message.hpp b/phlex/core/message.hpp index d1e188c5f..4567f9f02 100644 --- a/phlex/core/message.hpp +++ b/phlex/core/message.hpp @@ -3,6 +3,7 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/product_query.hpp" +#include "phlex/model/fwd.hpp" #include "phlex/model/handle.hpp" #include "phlex/model/product_store.hpp" #include "phlex/utilities/sized_tuple.hpp" @@ -11,89 +12,62 @@ #include #include +#include #include #include #include namespace phlex::experimental { + struct index_message { + data_cell_index_ptr index; + std::size_t msg_id; + bool cache{true}; + }; + + // FIXME: Do we need both indexed_end_token and flush_message? + struct indexed_end_token { + data_cell_index_ptr index; + int count; + }; + + struct flush_message { + data_cell_index_ptr index; + flush_counts_ptr counts; + std::size_t original_id; // FIXME: Used only by folds + }; + struct message { + // FIXME: Maybe consider adding an 'index' data member? product_store_const_ptr store; std::size_t id; - std::size_t original_id{-1ull}; // Used during flush }; - template - using messages_t = sized_tuple; - - struct MessageHasher { + struct message_matcher { std::size_t operator()(message const& msg) const noexcept; }; - // Overload for use with most_derived - message const& more_derived(message const& a, message const& b); - - namespace detail { - template - using join_messages_t = tbb::flow::join_node, tbb::flow::tag_matching>; - using no_join_base_t = - tbb::flow::function_node, tbb::flow::lightweight>; - - struct no_join : no_join_base_t { - no_join(tbb::flow::graph& g, MessageHasher); - }; - } + template + using message_tuple = sized_tuple; template - using join_or_none_t = std::conditional_t>; + using messages_t = std::conditional_t>; - template - auto make_join_or_none(tbb::flow::graph& g, std::index_sequence) - { - return join_or_none_t{g, type_t{}...}; - } + struct named_index_port { + std::string layer; + tbb::flow::receiver* token_port; + tbb::flow::receiver* index_port; + }; + using named_index_ports = std::vector; - template - std::vector*> input_ports(join_or_none_t& join) - { - if constexpr (N == 1ull) { - return {&join}; - } else { - return [&join]( - std::index_sequence) -> std::vector*> { - return {&input_port(join)...}; - }(std::make_index_sequence{}); - } - } + // Overload for use with most_derived + message const& more_derived(message const& a, message const& b); + + // Non-template overload for single message case + inline message const& most_derived(message const& msg) { return msg; } std::size_t port_index_for(product_queries const& product_labels, product_query const& product_label); - - template - tbb::flow::receiver& receiver_for(detail::join_messages_t& join, - std::size_t const index) - { - if constexpr (I < N) { - if (I != index) { - return receiver_for(join, index); - } - return input_port(join); - } - throw std::runtime_error("Should never get here"); - } - - template - tbb::flow::receiver& receiver_for(join_or_none_t& join, - product_queries const& product_labels, - product_query const& product_label) - { - if constexpr (N > 1ull) { - auto const index = port_index_for(product_labels, product_label); - return receiver_for<0ull, N>(join, index); - } else { - return join; - } - } } #endif // PHLEX_CORE_MESSAGE_HPP diff --git a/phlex/core/message_sender.cpp b/phlex/core/message_sender.cpp deleted file mode 100644 index 670b19cbe..000000000 --- a/phlex/core/message_sender.cpp +++ /dev/null @@ -1,38 +0,0 @@ -#include "phlex/core/message_sender.hpp" -#include "phlex/core/multiplexer.hpp" -#include "phlex/model/product_store.hpp" - -#include - -namespace phlex::experimental { - message_sender::message_sender(multiplexer& mplexer) : multiplexer_{mplexer} {} - - message message_sender::make_message(product_store_ptr store) - { - assert(store); - assert(not store->is_flush()); - auto const message_id = ++calls_; - original_message_ids_.try_emplace(store->index(), message_id); - return {store, message_id, -1ull}; - } - - void message_sender::send_flush(product_store_ptr store) - { - assert(store); - assert(store->is_flush()); - auto const message_id = ++calls_; - message const msg{store, message_id, original_message_id(store)}; - multiplexer_.try_put(std::move(msg)); - } - - std::size_t message_sender::original_message_id(product_store_ptr const& store) - { - assert(store); - assert(store->is_flush()); - - auto h = original_message_ids_.extract(store->index()); - assert(h); - return h.mapped(); - } - -} diff --git a/phlex/core/message_sender.hpp b/phlex/core/message_sender.hpp deleted file mode 100644 index 5aea9357a..000000000 --- a/phlex/core/message_sender.hpp +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef PHLEX_CORE_MESSAGE_SENDER_HPP -#define PHLEX_CORE_MESSAGE_SENDER_HPP - -#include "phlex/core/fwd.hpp" -#include "phlex/core/message.hpp" -#include "phlex/core/multiplexer.hpp" -#include "phlex/model/fwd.hpp" - -#include - -namespace phlex::experimental { - - class message_sender { - public: - explicit message_sender(multiplexer& mplexer); - - void send_flush(product_store_ptr store); - message make_message(product_store_ptr store); - - private: - std::size_t original_message_id(product_store_ptr const& store); - - multiplexer& multiplexer_; - std::map original_message_ids_; - std::size_t calls_{}; - }; - -} - -#endif // PHLEX_CORE_MESSAGE_SENDER_HPP diff --git a/phlex/core/multilayer_join_node.hpp b/phlex/core/multilayer_join_node.hpp new file mode 100644 index 000000000..cb0d4ac19 --- /dev/null +++ b/phlex/core/multilayer_join_node.hpp @@ -0,0 +1,180 @@ +#ifndef PHLEX_CORE_MULTILAYER_JOIN_NODE_HPP +#define PHLEX_CORE_MULTILAYER_JOIN_NODE_HPP + +#include "phlex/core/detail/repeater_node.hpp" +#include "phlex/core/message.hpp" + +#include "oneapi/tbb/flow_graph.h" + +#include +#include +#include +#include +#include + +namespace phlex::experimental { + template + using multilayer_join_node_base_t = tbb::flow::composite_node>; + + template + requires(n_inputs > 1) + class multilayer_join_node : public multilayer_join_node_base_t> { + using base_t = multilayer_join_node_base_t>; + using input_t = typename base_t::input_ports_type; + using output_t = typename base_t::output_ports_type; + + using args_t = message_tuple; + + template + static auto make_join(tbb::flow::graph& g, std::index_sequence) + { + return tbb::flow::join_node{ + g, type_t{}...}; + } + + public: + multilayer_join_node(tbb::flow::graph& g, + std::string const& node_name, + std::vector layer_names) : + base_t{g}, + join_{make_join(g, std::make_index_sequence{})}, + name_{node_name}, + layers_{std::move(layer_names)} + { + assert(n_inputs == layers_.size()); + + // Removes duplicates + std::set collapsed_layers{layers_.begin(), layers_.end()}; + + // Add repeaters only if there are non-duplicate layer specifications + if (collapsed_layers.size() > 1) { + repeaters_.reserve(n_inputs); + for (auto const& layer : layers_) { + repeaters_.push_back(std::make_unique(g, name_, layer)); + } + } + + auto set_ports = [this](std::index_sequence) { + if (repeaters_.empty()) { + // No repeating behavior necessary if all specified layer names are the same + // Just use TBB's join_node. + this->set_external_ports(input_t{input_port(join_)...}, output_t{join_}); + } else { + this->set_external_ports(input_t{repeaters_[Is]->data_port()...}, output_t{join_}); + // Connect repeaters to join + (make_edge(*repeaters_[Is], input_port(join_)), ...); + } + }; + + set_ports(std::make_index_sequence{}); + } + + std::vector index_ports() + { + // Returns an empty list if no repeaters are required + std::vector result; + result.reserve(repeaters_.size()); + for (std::size_t i = 0; i != n_inputs; ++i) { + result.emplace_back(layers_[i], &repeaters_[i]->flush_port(), &repeaters_[i]->index_port()); + } + return result; + } + + private: + std::vector> repeaters_; + tbb::flow::join_node join_; + std::string const name_; + std::vector const layers_; + }; + + namespace detail { + // Stateless placeholder for cases where no join is needed + struct no_join { + named_index_ports index_ports() const { return {}; } + }; + + template + struct pre_node { + using type = multilayer_join_node; + }; + + template <> + struct pre_node<1ull> { + using type = no_join; + }; + } + + template + using join_or_none_t = typename detail::pre_node::type; + + template + join_or_none_t make_join_or_none(tbb::flow::graph& g, + std::string const& node_name, + std::vector const& layers) + { + if constexpr (N > 1ull) { + return multilayer_join_node{g, node_name, layers}; + } else { + return detail::no_join{}; + } + } + + template + tbb::flow::receiver& receiver_for(multilayer_join_node& join, std::size_t const index) + { + if constexpr (I < N) { + if (I != index) { + return receiver_for(join, index); + } + return input_port(join); + } + throw std::runtime_error("Should never get here"); + } + + namespace detail { + template + std::vector*> input_ports(join_or_none_t& join) + { + static_assert(N > 1ull, "input_ports should not be called for N=1"); + return [&join]( + std::index_sequence) -> std::vector*> { + return {&input_port(join)...}; + }(std::make_index_sequence{}); + } + + template + tbb::flow::receiver& receiver_for(join_or_none_t& join, + product_queries const& product_labels, + product_query const& product_label) + { + static_assert(N > 1ull, "receiver_for should not be called for N=1"); + auto const index = port_index_for(product_labels, product_label); + return receiver_for<0ull, N>(join, index); + } + } + + template + std::vector*> input_ports(join_or_none_t& join, Node& node) + { + if constexpr (N == 1ull) { + return {&node}; + } else { + return detail::input_ports(join); + } + } + + template + tbb::flow::receiver& receiver_for(join_or_none_t& join, + product_queries const& product_labels, + product_query const& product_label, + Node& node) + { + if constexpr (N == 1ull) { + return node; + } else { + return detail::receiver_for(join, product_labels, product_label); + } + } +} + +#endif // PHLEX_CORE_MULTILAYER_JOIN_NODE_HPP diff --git a/phlex/core/multiplexer.cpp b/phlex/core/multiplexer.cpp deleted file mode 100644 index b8e7e3753..000000000 --- a/phlex/core/multiplexer.cpp +++ /dev/null @@ -1,73 +0,0 @@ -#include "phlex/core/multiplexer.hpp" -#include "phlex/model/product_store.hpp" - -#include "fmt/std.h" -#include "oneapi/tbb/flow_graph.h" -#include "spdlog/spdlog.h" - -#include -#include -#include -#include - -using namespace phlex::experimental; - -namespace { - product_store_const_ptr store_for(product_store_const_ptr store, - std::string const& port_product_layer) - { - if (store->index()->layer_name() == port_product_layer) { - // This store's layer matches what is expected by the port - return store; - } - - if (auto index = store->index()->parent(port_product_layer)) { - // This store has a parent layer that matches what is expected by the port - return std::make_shared(index, store->source()); - } - - return nullptr; - } -} - -namespace phlex::experimental { - - multiplexer::multiplexer(tbb::flow::graph& g, bool debug) : - base{g, tbb::flow::unlimited, std::bind_front(&multiplexer::multiplex, this)}, debug_{debug} - { - } - - void multiplexer::finalize(input_ports_t provider_input_ports) - { - // We must have at least one provider port, or there can be no data to process. - assert(!provider_input_ports.empty()); - provider_input_ports_ = std::move(provider_input_ports); - } - - tbb::flow::continue_msg multiplexer::multiplex(message const& msg) - { - ++received_messages_; - auto const& [store, message_id, _] = msg; - if (debug_) { - spdlog::debug("Multiplexing {} with ID {} (is flush: {})", - store->index()->to_string(), - message_id, - store->is_flush()); - } - - if (store->is_flush()) { - for (auto const& [_, port] : provider_input_ports_ | std::views::values) { - port->try_put(msg); - } - return {}; - } - - for (auto const& [product_label, port] : provider_input_ports_ | std::views::values) { - if (auto store_to_send = store_for(store, product_label.layer())) { - port->try_put({std::move(store_to_send), message_id}); - } - } - - return {}; - } -} diff --git a/phlex/core/multiplexer.hpp b/phlex/core/multiplexer.hpp deleted file mode 100644 index 846aed779..000000000 --- a/phlex/core/multiplexer.hpp +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef PHLEX_CORE_MULTIPLEXER_HPP -#define PHLEX_CORE_MULTIPLEXER_HPP - -#include "phlex/core/message.hpp" -#include "phlex/model/data_cell_index.hpp" - -#include "oneapi/tbb/concurrent_hash_map.h" -#include "oneapi/tbb/flow_graph.h" - -#include -#include -#include -#include -#include - -namespace phlex::experimental { - - class multiplexer : public tbb::flow::function_node { - using base = tbb::flow::function_node; - - public: - struct named_input_port { - product_query product_label; - tbb::flow::receiver* port; - }; - using named_input_ports_t = std::vector; - // map of node name to its input ports - using head_ports_t = std::map; - using input_ports_t = std::map; - - explicit multiplexer(tbb::flow::graph& g, bool debug = false); - tbb::flow::continue_msg multiplex(message const& msg); - - void finalize(input_ports_t provider_input_ports); - - private: - input_ports_t provider_input_ports_; - bool debug_; - std::atomic received_messages_{}; - }; - -} - -#endif // PHLEX_CORE_MULTIPLEXER_HPP diff --git a/phlex/core/products_consumer.cpp b/phlex/core/products_consumer.cpp index f8dca1854..700a0b361 100644 --- a/phlex/core/products_consumer.cpp +++ b/phlex/core/products_consumer.cpp @@ -1,11 +1,25 @@ #include "phlex/core/products_consumer.hpp" +namespace { + std::vector layers_from(phlex::product_queries const& queries) + { + std::vector result; + result.reserve(queries.size()); + for (auto const& query : queries) { + result.push_back(query.layer()); + } + return result; + } +} + namespace phlex::experimental { products_consumer::products_consumer(algorithm_name name, std::vector predicates, product_queries input_products) : - consumer{std::move(name), std::move(predicates)}, input_products_{std::move(input_products)} + consumer{std::move(name), std::move(predicates)}, + input_products_{std::move(input_products)}, + layers_{layers_from(input_products_)} { } @@ -19,4 +33,5 @@ namespace phlex::experimental { } product_queries const& products_consumer::input() const noexcept { return input_products_; } + std::vector const& products_consumer::layers() const noexcept { return layers_; } } diff --git a/phlex/core/products_consumer.hpp b/phlex/core/products_consumer.hpp index ee7a12532..d039a43bb 100644 --- a/phlex/core/products_consumer.hpp +++ b/phlex/core/products_consumer.hpp @@ -25,8 +25,10 @@ namespace phlex::experimental { std::size_t num_inputs() const; product_queries const& input() const noexcept; + std::vector const& layers() const noexcept; tbb::flow::receiver& port(product_query const& product_label); + virtual named_index_ports index_ports() = 0; virtual std::vector*> ports() = 0; virtual std::size_t num_calls() const = 0; @@ -41,6 +43,7 @@ namespace phlex::experimental { virtual tbb::flow::receiver& port_for(product_query const& product_label) = 0; product_queries input_products_; + std::vector layers_; }; } diff --git a/phlex/core/store_counters.cpp b/phlex/core/store_counters.cpp index 1a8afdeb1..9938a20c6 100644 --- a/phlex/core/store_counters.cpp +++ b/phlex/core/store_counters.cpp @@ -1,64 +1,25 @@ #include "phlex/core/store_counters.hpp" +#include "phlex/core/message.hpp" #include "phlex/model/data_cell_counter.hpp" #include "fmt/std.h" #include "spdlog/spdlog.h" -namespace phlex::experimental { - - void store_flag::flush_received(std::size_t const original_message_id) - { - flush_received_ = true; - original_message_id_ = original_message_id; - } - - bool store_flag::is_complete() const noexcept { return processed_ and flush_received_; } +#include - void store_flag::mark_as_processed() noexcept { processed_ = true; } - - unsigned int store_flag::original_message_id() const noexcept { return original_message_id_; } - - void detect_flush_flag::mark_flush_received(data_cell_index::hash_type const hash, - std::size_t const original_message_id) - { - flag_accessor fa; - if (flags_.insert(fa, hash)) { - fa->second = std::make_unique(); - } - fa->second->flush_received(original_message_id); - } - - void detect_flush_flag::mark_processed(data_cell_index::hash_type const hash) - { - flag_accessor fa; - if (flags_.insert(fa, hash)) { - fa->second = std::make_unique(); - } - fa->second->mark_as_processed(); - } - - bool detect_flush_flag::done_with(product_store_const_ptr const& store) - { - auto const h = store->index()->hash(); - if (flag_accessor fa; flags_.find(fa, h) && fa->second->is_complete()) { - return flags_.erase(fa); - } - return false; - } - - // ===================================================================================== +namespace phlex::experimental { - void store_counter::set_flush_value(product_store_const_ptr const& store, + void store_counter::set_flush_value(flush_counts_ptr counts, std::size_t const original_message_id) { - if (not store->contains_product("[flush]")) { + if (not counts) { return; } #ifdef __cpp_lib_atomic_shared_ptr - flush_counts_ = store->get_product("[flush]"); + flush_counts_ = counts; #else - atomic_store(&flush_counts_, store->get_product("[flush]")); + atomic_store(&flush_counts_, counts); #endif original_message_id_ = original_message_id; } diff --git a/phlex/core/store_counters.hpp b/phlex/core/store_counters.hpp index f509c7a8c..8363a93d8 100644 --- a/phlex/core/store_counters.hpp +++ b/phlex/core/store_counters.hpp @@ -14,40 +14,9 @@ #include namespace phlex::experimental { - class store_flag { - public: - void flush_received(std::size_t original_message_id); - bool is_complete() const noexcept; - void mark_as_processed() noexcept; - unsigned int original_message_id() const noexcept; - - private: - std::atomic flush_received_{false}; - std::atomic processed_{false}; - std::atomic - original_message_id_{}; // Necessary for matching inputs to downstream join nodes. - }; - - class detect_flush_flag { - protected: - void mark_flush_received(data_cell_index::hash_type hash, std::size_t original_message_id); - void mark_processed(data_cell_index::hash_type hash); - bool done_with(product_store_const_ptr const& store); - - private: - using flags_t = - tbb::concurrent_hash_map>; - using flag_accessor = flags_t::accessor; - using const_flag_accessor = flags_t::const_accessor; - - flags_t flags_; - }; - - // ========================================================================= - class store_counter { public: - void set_flush_value(product_store_const_ptr const& ptr, std::size_t original_message_id); + void set_flush_value(flush_counts_ptr counts, std::size_t original_message_id); void increment(data_cell_index::hash_type layer_hash); bool is_complete(); unsigned int original_message_id() const noexcept; diff --git a/phlex/model/data_cell_counter.hpp b/phlex/model/data_cell_counter.hpp index f744a18b1..ff5f0d50c 100644 --- a/phlex/model/data_cell_counter.hpp +++ b/phlex/model/data_cell_counter.hpp @@ -33,8 +33,6 @@ namespace phlex::experimental { std::map child_counts_{}; }; - using flush_counts_ptr = std::shared_ptr; - class data_cell_counter { public: data_cell_counter(); diff --git a/phlex/model/fwd.hpp b/phlex/model/fwd.hpp index d9734c607..a803637c7 100644 --- a/phlex/model/fwd.hpp +++ b/phlex/model/fwd.hpp @@ -6,12 +6,12 @@ namespace phlex::experimental { class data_cell_counter; class data_layer_hierarchy; + class flush_counts; class product_store; + using flush_counts_ptr = std::shared_ptr; using product_store_const_ptr = std::shared_ptr; using product_store_ptr = std::shared_ptr; - - enum class stage { process, flush }; } namespace phlex { diff --git a/phlex/model/product_store.cpp b/phlex/model/product_store.cpp index 6985ec427..ea43db2dc 100644 --- a/phlex/model/product_store.cpp +++ b/phlex/model/product_store.cpp @@ -6,14 +6,8 @@ namespace phlex::experimental { - product_store::product_store(data_cell_index_ptr id, - std::string source, - products new_products, - stage processing_stage) : - products_{std::move(new_products)}, - id_{std::move(id)}, - source_{std::move(source)}, - stage_{processing_stage} + product_store::product_store(data_cell_index_ptr id, std::string source, products new_products) : + products_{std::move(new_products)}, id_{std::move(id)}, source_{std::move(source)} { } @@ -24,15 +18,9 @@ namespace phlex::experimental { return product_store_ptr{new product_store{data_cell_index::base_ptr(), std::move(base_name)}}; } - product_store_ptr product_store::make_flush() const - { - return product_store_ptr{new product_store{id_, "[inserted]", {}, stage::flush}}; - } - std::string const& product_store::layer_name() const noexcept { return id_->layer_name(); } std::string const& product_store::source() const noexcept { return source_; } data_cell_index_ptr const& product_store::index() const noexcept { return id_; } - bool product_store::is_flush() const noexcept { return stage_ == stage::flush; } bool product_store::contains_product(std::string const& product_name) const { diff --git a/phlex/model/product_store.hpp b/phlex/model/product_store.hpp index 22eeeb620..c7fc1b4cc 100644 --- a/phlex/model/product_store.hpp +++ b/phlex/model/product_store.hpp @@ -13,13 +13,11 @@ #include namespace phlex::experimental { - class product_store { public: explicit product_store(data_cell_index_ptr id, std::string source = "Source", - products new_products = {}, - stage processing_stage = stage::process); + products new_products = {}); ~product_store(); static product_store_ptr base(std::string base_name = "Source"); @@ -30,9 +28,7 @@ namespace phlex::experimental { std::string const& layer_name() const noexcept; std::string const& source() const noexcept; - product_store_ptr make_flush() const; data_cell_index_ptr const& index() const noexcept; - bool is_flush() const noexcept; // Product interface bool contains_product(std::string const& key) const; @@ -55,13 +51,16 @@ namespace phlex::experimental { data_cell_index_ptr id_; std::string source_; // FIXME: Should not have to copy the string (the source should outlive the product store) - stage stage_; }; product_store_ptr const& more_derived(product_store_ptr const& a, product_store_ptr const& b); - template - Element const& get_most_derived(Tuple const& tup, Element const& element) + // Non-template overload for single product_store_ptr case + inline product_store_ptr const& most_derived(product_store_ptr const& store) { return store; } + + // Generic most_derived for tuples + template + auto const& get_most_derived(Tuple const& tup, std::tuple_element_t const& element) { constexpr auto N = std::tuple_size_v; if constexpr (I == N - 1) { @@ -71,16 +70,10 @@ namespace phlex::experimental { } } - template - auto const& most_derived(Tuple const& tup) + template + auto const& most_derived(std::tuple const& elements) { - constexpr auto N = std::tuple_size_v; - static_assert(N > 0ull); - if constexpr (N == 1ull) { - return std::get<0>(tup); - } else { - return get_most_derived<1ull>(tup, std::get<0>(tup)); - } + return get_most_derived<1ull>(elements, std::get<0>(elements)); } // Implementation details diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 715ecbf9d..28a203f10 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -180,6 +180,14 @@ cet_test( spdlog::spdlog layer_generator ) +cet_test( + repeater_node + USE_CATCH2_MAIN + SOURCE + repeater_node_test.cpp + LIBRARIES + phlex::core +) cet_test( replicated USE_CATCH2_MAIN diff --git a/test/allowed_families.cpp b/test/allowed_families.cpp index dcf7e574e..7eef06780 100644 --- a/test/allowed_families.cpp +++ b/test/allowed_families.cpp @@ -55,12 +55,11 @@ TEST_CASE("Testing families", "[data model]") .input_family("id"_in("run"), "id"_in("subrun"), "id"_in("event")); g.execute(); - // FIXME: Need to improve the synchronization to supply strict equality CHECK(g.execution_count("se") == 1ull); - CHECK(g.execution_count("rs") >= 1ull); + CHECK(g.execution_count("rs") == 1ull); CHECK(g.execution_count("rse") == 1ull); - CHECK(g.execution_count("run_id_provider") >= 1ull); - CHECK(g.execution_count("subrun_id_provider") >= 1ull); + CHECK(g.execution_count("run_id_provider") == 1ull); + CHECK(g.execution_count("subrun_id_provider") == 1ull); CHECK(g.execution_count("event_id_provider") == 1ull); } diff --git a/test/cached_execution.cpp b/test/cached_execution.cpp index 51372eb7d..70ac37e7e 100644 --- a/test/cached_execution.cpp +++ b/test/cached_execution.cpp @@ -1,7 +1,7 @@ // ======================================================================================= // This test executes the following graph // -// Multiplexer +// Index Router // | | | // A1 | | // |\ | | @@ -90,13 +90,12 @@ TEST_CASE("Cached function calls", "[data model]") g.execute(); - // FIXME: Need to improve the synchronization to supply strict equality - CHECK(g.execution_count("A1") >= n_runs); - CHECK(g.execution_count("A2") >= n_runs); - CHECK(g.execution_count("A3") >= n_runs); + CHECK(g.execution_count("A1") == n_runs); + CHECK(g.execution_count("A2") == n_runs); + CHECK(g.execution_count("A3") == n_runs); - CHECK(g.execution_count("B1") >= n_runs * n_subruns); - CHECK(g.execution_count("B2") >= n_runs * n_subruns); + CHECK(g.execution_count("B1") == n_runs * n_subruns); + CHECK(g.execution_count("B2") == n_runs * n_subruns); CHECK(g.execution_count("C") == n_runs * n_subruns * n_events); } diff --git a/test/different_hierarchies.cpp b/test/different_hierarchies.cpp index 8ebf34468..0d010ac96 100644 --- a/test/different_hierarchies.cpp +++ b/test/different_hierarchies.cpp @@ -1,7 +1,7 @@ // ======================================================================================= // This test executes the following graph // -// Multiplexer +// Index Router // | | // job_add(*) run_add(^) // | | @@ -25,7 +25,7 @@ // As the run_add node performs folds only over "runs", any top-level "events" // stores are excluded from the fold result. // -// N.B. The multiplexer sends data products to nodes based on the name of the lowest +// N.B. The index_router sends data products to nodes based on the name of the lowest // layer. For example, the top-level "event" and the nested "run/event" are both // candidates for the "job" fold. // ======================================================================================= diff --git a/test/fold.cpp b/test/fold.cpp index cadcb525e..3910dbb28 100644 --- a/test/fold.cpp +++ b/test/fold.cpp @@ -2,7 +2,7 @@ /* This test executes the following graph - Multiplexer + Index Router / \ / \ job_add(*) run_add(^) diff --git a/test/hierarchical_nodes.cpp b/test/hierarchical_nodes.cpp index a3bd0dd86..1633e09e6 100644 --- a/test/hierarchical_nodes.cpp +++ b/test/hierarchical_nodes.cpp @@ -1,7 +1,7 @@ // ======================================================================================= // This test executes the following graph // -// Multiplexer +// Index Router // | | // get_time square // | | @@ -134,7 +134,7 @@ TEST_CASE("Hierarchical nodes", "[graph]") CHECK(g.execution_count("square") == index_limit * number_limit); CHECK(g.execution_count("add") == index_limit * number_limit); - CHECK(g.execution_count("get_the_time") >= index_limit); + CHECK(g.execution_count("get_the_time") == index_limit); CHECK(g.execution_count("scale") == index_limit); CHECK(g.execution_count("print_result") == index_limit); } diff --git a/test/product_store.cpp b/test/product_store.cpp index 0409e4a3f..0ad77c904 100644 --- a/test/product_store.cpp +++ b/test/product_store.cpp @@ -43,9 +43,8 @@ TEST_CASE("Product store derivation", "[data model]") SECTION("Only one store") { auto store = product_store::base(); - auto stores = std::make_tuple(store); CHECK(store == more_derived(store, store)); - CHECK(store == most_derived(stores)); + CHECK(store == most_derived(store)); } auto root = product_store::base(); diff --git a/test/repeater_node_test.cpp b/test/repeater_node_test.cpp new file mode 100644 index 000000000..573870529 --- /dev/null +++ b/test/repeater_node_test.cpp @@ -0,0 +1,341 @@ +#include "phlex/core/detail/repeater_node.hpp" +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/product_store.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "catch2/matchers/catch_matchers_string.hpp" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include + +using namespace oneapi; +using namespace phlex; +using namespace phlex::experimental; + +namespace { + auto make_test_store(data_cell_index_ptr const& index, int value) + { + auto store = std::make_shared(index); + store->add_product("value", value); + return store; + } + + class message_collector : public tbb::flow::function_node { + public: + explicit message_collector(tbb::flow::graph& g) : + tbb::flow::function_node{ + g, tbb::flow::serial, [this](message const& msg) { messages_.push_back(msg); }} + { + } + + auto const& messages() const noexcept { return messages_; } + + auto sorted_message_ids() const + { + auto ids = messages_ | std::views::transform([](auto const& msg) { return msg.id; }) | + std::ranges::to>(); + std::ranges::sort(ids); + return ids; + } + + private: + std::vector messages_; + }; + + void use_ostream_logger(std::ostringstream& oss) + { + auto ostream_sink = std::make_shared(oss); + auto ostream_logger = std::make_shared("my_logger", ostream_sink); + spdlog::set_default_logger(ostream_logger); + } + + class repeater_test_fixture { + public: + repeater_test_fixture(std::string node_name, std::string layer_name = "run") : + repeater_{g_, std::move(node_name), std::move(layer_name)}, consumer_{g_} + { + make_edge(repeater_, consumer_); + } + + void put_data_message(message const& msg) { repeater_.data_port().try_put(msg); } + void put_index_message(index_message const& msg) { repeater_.index_port().try_put(msg); } + void put_flush_token(indexed_end_token const& token) { repeater_.flush_port().try_put(token); } + + void wait_for_all() { g_.wait_for_all(); } + + auto const& consumed_messages() const { return consumer_.messages(); } + auto sorted_message_ids() const { return consumer_.sorted_message_ids(); } + + private: + tbb::flow::graph g_; + detail::repeater_node repeater_; + message_collector consumer_; + }; +} + +TEST_CASE("Test repeater pass-through mode", "[multithreading]") +{ + repeater_test_fixture fixture{"test_repeater_pass_through"}; + + auto run1 = data_cell_index::base_ptr()->make_child(1, "run"); + auto store1 = make_test_store(run1, 42); + + // Send index message with cache=false (pass-through mode) + fixture.put_index_message({.index = run1, .msg_id = 1, .cache = false}); + + // Send data message + fixture.put_data_message({.store = store1, .id = 0}); + + fixture.wait_for_all(); + + // In pass-through mode, message should be received immediately + REQUIRE(fixture.consumed_messages().size() == 1); + CHECK(fixture.consumed_messages()[0].store == store1); + CHECK(fixture.consumed_messages()[0].id == 0); +} + +TEST_CASE("Test repeater caching mode", "[multithreading]") +{ + repeater_test_fixture fixture{"test_repeater_caching_mode"}; + + auto run1 = data_cell_index::base_ptr()->make_child(1, "run"); + auto store1 = make_test_store(run1, 42); + + // Send index messages first (with cache=true, which is the default) + fixture.put_index_message({.index = run1, .msg_id = 1, .cache = true}); + fixture.put_index_message({.index = run1, .msg_id = 2, .cache = true}); + fixture.put_index_message({.index = run1, .msg_id = 3, .cache = true}); + + fixture.wait_for_all(); + + // No messages yet, waiting for data + CHECK(fixture.consumed_messages().empty()); + + // Send data message - should trigger emission of all queued message IDs + fixture.put_data_message({.store = store1, .id = 0}); + + fixture.wait_for_all(); + + // Should receive 3 messages with the same store but different IDs + REQUIRE(fixture.consumed_messages().size() == 3); + // The messages are not guaranteed to arrive at a particular order, so we sort by message ID + // before checking. + CHECK(fixture.sorted_message_ids() == std::vector{1, 2, 3}); + CHECK(fixture.consumed_messages()[0].store == store1); + CHECK(fixture.consumed_messages()[1].store == store1); + CHECK(fixture.consumed_messages()[2].store == store1); + + // Now evict the data (we actually test flushing in the next test, but we do this to avoid + // any warnings about cached products not being flushed) + fixture.put_flush_token({.index = run1, .count = 3}); + fixture.wait_for_all(); +} + +TEST_CASE("Test repeater with flush tokens", "[multithreading]") +{ + repeater_test_fixture fixture{"test_repeater_with_flush_tokens"}; + + auto run1 = data_cell_index::base_ptr()->make_child(1, "run"); + auto store1 = make_test_store(run1, 42); + + // Send index messages + fixture.put_index_message({.index = run1, .msg_id = 1, .cache = true}); + fixture.put_index_message({.index = run1, .msg_id = 2, .cache = true}); + + // Send data message + fixture.put_data_message({.store = store1, .id = 0}); + + fixture.wait_for_all(); + + REQUIRE(fixture.consumed_messages().size() == 2); + + // Send flush token with count=2 (indicating 2 messages were sent) + fixture.put_flush_token({.index = run1, .count = 2}); + + fixture.wait_for_all(); + + // Cache should be cleaned up after flush with counter reaching zero + CHECK(fixture.consumed_messages().size() == 2); +} + +TEST_CASE("Test repeater multiple indices", "[multithreading]") +{ + tbb::flow::graph g; + detail::repeater_node repeater{g, "test_repeater_multiple_indices", "run"}; + + std::atomic message_count{0}; + tbb::flow::function_node consumer{ + g, tbb::flow::unlimited, [&message_count](message const&) { ++message_count; }}; + + make_edge(repeater, consumer); + + auto run1 = data_cell_index::base_ptr()->make_child(1, "run"); + auto run2 = data_cell_index::base_ptr()->make_child(2, "run"); + auto store1 = make_test_store(run1, 42); + auto store2 = make_test_store(run2, 43); + + // Send index messages for different runs + repeater.index_port().try_put({.index = run1, .msg_id = 1, .cache = true}); + repeater.index_port().try_put({.index = run2, .msg_id = 2, .cache = true}); + repeater.index_port().try_put({.index = run1, .msg_id = 3, .cache = true}); + + // Send data for run1 + repeater.data_port().try_put({.store = store1, .id = 0}); + + g.wait_for_all(); + + // Should receive 2 messages for run1 (msg_id 1 and 3) + CHECK(message_count == 2); + + // Send data for run2 + repeater.data_port().try_put({.store = store2, .id = 0}); + + g.wait_for_all(); + + // Should now have 3 total messages + CHECK(message_count == 3); + + // Send flush tokens + repeater.flush_port().try_put({.index = run1, .count = 2}); + repeater.flush_port().try_put({.index = run2, .count = 1}); + + g.wait_for_all(); +} + +TEST_CASE("Test repeater transition to pass-through", "[multithreading]") +{ + repeater_test_fixture fixture{"test_repeater_transition_to_pass_through"}; + + auto run1 = data_cell_index::base_ptr()->make_child(1, "run"); + auto store1 = make_test_store(run1, 42); + + // Start in caching mode + fixture.put_index_message({.index = run1, .msg_id = 1, .cache = true}); + + // Send data + fixture.put_data_message({.store = store1, .id = 0}); + + fixture.wait_for_all(); + + CHECK(fixture.consumed_messages().size() == 1); + + // Transition to pass-through mode with cache=false + fixture.put_index_message({.index = run1, .msg_id = 2, .cache = false}); + + fixture.wait_for_all(); + + // The cached product should be output during transition + CHECK(fixture.consumed_messages().size() == 2); + CHECK(fixture.consumed_messages()[1].store == store1); + + // Now in pass-through mode, subsequent data should go through directly + auto store2 = make_test_store(run1, 99); + fixture.put_data_message({.store = store2, .id = 0}); + + fixture.wait_for_all(); + + CHECK(fixture.consumed_messages().size() == 3); + CHECK(fixture.consumed_messages()[2].store == store2); +} + +TEST_CASE("Test repeater data before index", "[multithreading]") +{ + repeater_test_fixture fixture{"test_repeater_data_before_index"}; + + auto run1 = data_cell_index::base_ptr()->make_child(1, "run"); + auto store1 = make_test_store(run1, 42); + + // Send data first (before any index messages) + fixture.put_data_message({.store = store1, .id = 0}); + + fixture.wait_for_all(); + + // No messages yet + CHECK(fixture.consumed_messages().empty()); + + // Now send index messages + fixture.put_index_message({.index = run1, .msg_id = 1, .cache = true}); + fixture.put_index_message({.index = run1, .msg_id = 2, .cache = true}); + + fixture.wait_for_all(); + + // Should receive messages now with the cached data + REQUIRE(fixture.consumed_messages().size() == 2); + // The messages are not guaranteed to arrive at a particular order, so we sort by message ID + // before checking. + CHECK(fixture.sorted_message_ids() == std::vector{1, 2}); + CHECK(fixture.consumed_messages()[0].store == store1); + CHECK(fixture.consumed_messages()[1].store == store1); + + // Now evict the data (we actually test flushing in an earlier test, but we do this to avoid + // any warnings about cached products not being flushed) + fixture.put_flush_token({.index = run1, .count = 2}); + fixture.wait_for_all(); +} + +TEST_CASE("Test warning message if there are cached products", "[multithreading]") +{ + // Setup ostream sink to capture warning message + std::ostringstream oss; + use_ostream_logger(oss); + + // Below, we want to trigger the destruction of the repeater, which will emit a warning if + // there are still cached products that were never flushed. To do this without introducing a + // new scope, we create the fixture as a unique_ptr and then reset it at the end of the test, + // which will invoke the destructor. + auto fixture = + std::make_unique("test_repeater_warning_on_cached_products"); + + auto run1 = data_cell_index::base_ptr()->make_child(1, "run"); + auto store1 = make_test_store(run1, 42); + + // Send index message with cache=true (caching mode) + fixture->put_index_message({.index = run1, .msg_id = 1, .cache = true}); + fixture->put_data_message({.store = store1, .id = 0}); + + fixture->wait_for_all(); + + REQUIRE(fixture->consumed_messages().size() == 1); + + fixture.reset(); // Invoke fixture destructor to trigger warning message + + auto const warning = oss.str(); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Cached messages: 1")); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Product for [run:1]")); +} + +TEST_CASE("Test warning message if there are cached messages", "[multithreading]") +{ + // Setup ostream sink to capture warning message + std::ostringstream oss; + use_ostream_logger(oss); + + // Below, we want to trigger the destruction of the repeater, which will emit a warning if + // there are still cached messages that were never flushed. To do this without introducing a + // new scope, we create the fixture as a unique_ptr and then reset it at the end of the test, + // which will invoke the destructor. + auto fixture = + std::make_unique("test_repeater_warning_on_cached_messages"); + + auto run1 = data_cell_index::base_ptr()->make_child(1, "run"); + auto store1 = make_test_store(run1, 42); + + // Send index message with cache=true (caching mode) + fixture->put_index_message({.index = run1, .msg_id = 1, .cache = true}); + + fixture->wait_for_all(); + + REQUIRE( + fixture->consumed_messages().empty()); // No messages yet since data message hasn't been sent + + fixture.reset(); // Invoke fixture destructor to trigger warning message + + auto const warning = oss.str(); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Cached messages: 1")); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Product not yet received")); +} diff --git a/test/unfold.cpp b/test/unfold.cpp index 9baef7b0d..216e6e0bd 100644 --- a/test/unfold.cpp +++ b/test/unfold.cpp @@ -1,7 +1,7 @@ // ======================================================================================= // This test executes unfolding functionality using the following graph // -// Multiplexer +// Index Router // | // unfold (creates children) // | @@ -11,7 +11,7 @@ // // where the asterisk (*) indicates a fold step. The difference here is that the // *unfold* is responsible for sending the flush token instead of the -// source/multiplexer. +// source/index_router. // ======================================================================================= #include "phlex/core/framework_graph.hpp" diff --git a/test/vector_of_abstract_types.cpp b/test/vector_of_abstract_types.cpp index fbf69aac9..0c225775f 100644 --- a/test/vector_of_abstract_types.cpp +++ b/test/vector_of_abstract_types.cpp @@ -51,5 +51,6 @@ TEST_CASE("Test vector of abstract types") .input_family("sum"_in("event")); g.execute(); + CHECK(g.execution_count("provide_thing") == 1); CHECK(g.execution_count("read_thing") == 1); }