Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions phlex/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ cet_make_library(
detail/make_algorithm_name.cpp
detail/maybe_predicates.cpp
detail/filter_impl.cpp
detail/repeater_node.cpp
edge_creation_policy.cpp
edge_maker.cpp
filter.cpp
framework_graph.cpp
glue.cpp
input_arguments.cpp
message.cpp
message_sender.cpp
node_catalog.cpp
multiplexer.cpp
index_router.cpp
products_consumer.cpp
registrar.cpp
registration_api.cpp
Expand Down Expand Up @@ -61,8 +61,7 @@ install(
graph_proxy.hpp
input_arguments.hpp
message.hpp
message_sender.hpp
multiplexer.hpp
index_router.hpp
node_catalog.hpp
product_query.hpp
products_consumer.hpp
Expand All @@ -74,7 +73,11 @@ install(
)
install(FILES fold/send.hpp DESTINATION include/phlex/core/fold)
install(
FILES detail/make_algorithm_name.hpp detail/maybe_predicates.hpp detail/filter_impl.hpp
FILES
detail/filter_impl.hpp
detail/make_algorithm_name.hpp
detail/maybe_predicates.hpp
detail/repeater_node.hpp
DESTINATION include/phlex/core/detail
)
target_include_directories(phlex_core PRIVATE ${PROJECT_SOURCE_DIR})
Expand Down
130 changes: 75 additions & 55 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "phlex/core/fwd.hpp"
#include "phlex/core/input_arguments.hpp"
#include "phlex/core/message.hpp"
#include "phlex/core/multilayer_join_node.hpp"
#include "phlex/core/product_query.hpp"
#include "phlex/core/products_consumer.hpp"
#include "phlex/core/store_counters.hpp"
Expand Down Expand Up @@ -42,6 +43,7 @@

virtual tbb::flow::sender<message>& sender() = 0;
virtual tbb::flow::sender<message>& to_output() = 0;
virtual tbb::flow::receiver<flush_message>& flush_port() = 0;
virtual product_specifications const& output() const = 0;
virtual std::size_t product_count() const = 0;
};
Expand Down Expand Up @@ -75,81 +77,98 @@
initializer_{std::move(initializer)},
output_{to_product_specifications(full_name(), std::move(output), make_type_ids<R>())},
partition_{std::move(partition)},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
fold_{g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto& outputs) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& [store, original_message_id] = std::tie(msg.store, msg.original_id);

if (not store->is_flush() and not store->index()->parent(partition_)) {
return;
}

if (store->is_flush()) {
// Downstream nodes always get the flush.
get<0>(outputs).try_put(msg);
if (store->index()->layer_name() != partition_) {
return;
}
}

auto const& fold_index =
store->is_flush() ? store->index() : store->index()->parent(partition_);
assert(fold_index);
auto const& id_hash_for_counter = fold_index->hash();

if (store->is_flush()) {
counter_for(id_hash_for_counter).set_flush_value(store, original_message_id);
} else {
call(ft, messages, std::make_index_sequence<N>{});
counter_for(id_hash_for_counter).increment(store->index()->layer_hash());
}

if (auto counter = done_with(id_hash_for_counter)) {
auto parent = std::make_shared<product_store>(fold_index, this->full_name());
commit_(*parent);
++product_count_;
get<0>(outputs).try_put({parent, counter->original_message_id()});
}
}}
flush_receiver_{g,
tbb::flow::unlimited,
[this](flush_message const& msg) -> tbb::flow::continue_msg {
auto const& [index, counts, original_message_id] = msg;
if (index->layer_name() != partition_) {
return {};
}

counter_for(index->hash()).set_flush_value(counts, original_message_id);
emit_and_evict_if_done(index);
return {};
}},
join_{make_join_or_none<N>(
g, full_name(), layers())}, // FIXME: This should change to include result product!
fold_{
g, concurrency, [this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto&) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& index = msg.store->index();

auto fold_index = index->parent(partition_);
if (not fold_index) {
return;
}

auto const& index_hash_for_counter = fold_index->hash();

call(ft, messages, std::make_index_sequence<N>{});
++calls_;

counter_for(index_hash_for_counter).increment(index->layer_hash());

emit_and_evict_if_done(fold_index);
}}
{
make_edge(join_, fold_);
if constexpr (N > 1ull) {
make_edge(join_, fold_);
}
}

private:
void emit_and_evict_if_done(data_cell_index_ptr const& fold_index)
{
if (auto counter = done_with(fold_index->hash())) {
auto parent = std::make_shared<product_store>(fold_index, this->full_name());
commit_(parent);
++product_count_;
output_port<0>(fold_).try_put({parent, counter->original_message_id()});
}
}

tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label);
return receiver_for<N>(join_, input(), product_label, fold_);
}

std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }
std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<N>(join_, fold_);

Check warning on line 140 in phlex/core/declared_fold.hpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/declared_fold.hpp#L140

Added line #L140 was not covered by tests
}

tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& sender() override { return output_port<0ull>(fold_); }
tbb::flow::sender<message>& to_output() override { return sender(); }
product_specifications const& output() const override { return output_; }

template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
{
auto const& parent_id = *most_derived(messages).store->index()->parent(partition_);
auto const parent_index = most_derived(messages).store->index()->parent(partition_);

// FIXME: Not the safest approach!
auto it = results_.find(parent_id);
auto it = results_.find(parent_index->hash());
if (it == results_.end()) {
it =
results_
.insert({parent_id,
.insert({parent_index->hash(),
initialized_object(std::move(initializer_),
std::make_index_sequence<std::tuple_size_v<InitTuple>>{})})
.first;
}
++calls_;
return std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);

if constexpr (N == 1ull) {
std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(messages)...);
} else {
std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
}
}

named_index_ports index_ports() final { return join_.index_ports(); }

Check warning on line 171 in phlex/core/declared_fold.hpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/declared_fold.hpp#L171

Added line #L171 was not covered by tests
std::size_t num_calls() const final { return calls_.load(); }
std::size_t product_count() const final { return product_count_.load(); }

Expand All @@ -160,13 +179,13 @@
new R{std::forward<std::tuple_element_t<Is, InitTuple>>(std::get<Is>(tuple))...}};
}

void commit_(product_store& store)
auto commit_(product_store_ptr& store)
{
auto& result = results_.at(*store.index());
auto& result = results_.at(store->index()->hash());
if constexpr (requires { send(*result); }) {
store.add_product(output()[0].name(), send(*result));
store->add_product(output()[0].name(), send(*result));
} else {
store.add_product(output()[0].name(), std::move(*result));
store->add_product(output()[0].name(), std::move(result));
}
// Reclaim some memory; it would be better to erase the entire entry from the map,
// but that is not thread-safe.
Expand All @@ -177,9 +196,10 @@
input_retriever_types<input_parameter_types> input_{input_arguments<input_parameter_types>()};
product_specifications output_;
std::string partition_;
tbb::flow::function_node<flush_message> flush_receiver_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, messages_t<1>> fold_;
tbb::concurrent_unordered_map<data_cell_index, std::unique_ptr<R>> results_;
tbb::flow::multifunction_node<messages_t<N>, message_tuple<1>> fold_;
tbb::concurrent_unordered_map<data_cell_index::hash_type, std::unique_ptr<R>> results_;
std::atomic<std::size_t> calls_;
std::atomic<std::size_t> product_count_;
};
Expand Down
14 changes: 0 additions & 14 deletions phlex/core/declared_observer.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#include "phlex/core/declared_observer.hpp"

#include "fmt/std.h"
#include "spdlog/spdlog.h"

namespace phlex::experimental {
declared_observer::declared_observer(algorithm_name name,
std::vector<std::string> predicates,
Expand All @@ -12,15 +9,4 @@ namespace phlex::experimental {
}

declared_observer::~declared_observer() = default;

void declared_observer::report_cached_hashes(
tbb::concurrent_hash_map<data_cell_index::hash_type, bool> const& hashes) const
{
if (hashes.size() > 0ull) {
spdlog::warn("Monitor {} has {} cached hashes.", full_name(), hashes.size());
}
for (auto const& [id, _] : hashes) {
spdlog::debug(" => ID: {}", id);
}
}
}
53 changes: 17 additions & 36 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "phlex/core/fwd.hpp"
#include "phlex/core/input_arguments.hpp"
#include "phlex/core/message.hpp"
#include "phlex/core/multilayer_join_node.hpp"
#include "phlex/core/product_query.hpp"
#include "phlex/core/products_consumer.hpp"
#include "phlex/core/store_counters.hpp"
Expand All @@ -16,7 +17,6 @@
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"

#include "oneapi/tbb/concurrent_hash_map.h"
#include "oneapi/tbb/flow_graph.h"

#include <concepts>
Expand All @@ -36,12 +36,6 @@ namespace phlex::experimental {
std::vector<std::string> predicates,
product_queries input_products);
virtual ~declared_observer();

protected:
using hashes_t = tbb::concurrent_hash_map<data_cell_index::hash_type, bool>;
using accessor = hashes_t::accessor;

void report_cached_hashes(hashes_t const& hashes) const;
};

using declared_observer_ptr = std::unique_ptr<declared_observer>;
Expand All @@ -50,7 +44,7 @@ namespace phlex::experimental {
// =====================================================================================

template <typename AlgorithmBits>
class observer_node : public declared_observer, private detect_flush_flag {
class observer_node : public declared_observer {
using InputArgs = typename AlgorithmBits::input_parameter_types;
using function_t = typename AlgorithmBits::bound_type;
static constexpr auto N = AlgorithmBits::number_inputs;
Expand All @@ -66,61 +60,48 @@ namespace phlex::experimental {
AlgorithmBits alg,
product_queries input_products) :
declared_observer{std::move(name), std::move(predicates), std::move(input_products)},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
join_{make_join_or_none<N>(g, full_name(), layers())},
observer_{g,
concurrency,
[this, ft = alg.release_algorithm()](
messages_t<N> const& messages) -> oneapi::tbb::flow::continue_msg {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);
if (store->is_flush()) {
mark_flush_received(store->index()->hash(), message_id);
} else if (accessor a; needs_new(store, a)) {
call(ft, messages, std::make_index_sequence<N>{});
a->second = true;
mark_processed(store->index()->hash());
}

if (done_with(store)) {
cached_hashes_.erase(store->index()->hash());
}
call(ft, messages, std::make_index_sequence<N>{});
++calls_;
return {};
}}
{
make_edge(join_, observer_);
if constexpr (N > 1ull) {
make_edge(join_, observer_);
}
}

~observer_node() { report_cached_hashes(cached_hashes_); }

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label);
return receiver_for<N>(join_, input(), product_label, observer_);
}

std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }

bool needs_new(product_store_const_ptr const& store, accessor& a)
std::vector<tbb::flow::receiver<message>*> ports() override
{
if (cached_hashes_.count(store->index()->hash()) > 0ull) {
return false;
}
return cached_hashes_.insert(a, store->index()->hash());
return input_ports<N>(join_, observer_);
}

template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
{
++calls_;
return std::invoke(ft, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
if constexpr (N == 1ull) {
std::invoke(ft, std::get<Is>(input_).retrieve(messages)...);
} else {
std::invoke(ft, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
}
}

named_index_ports index_ports() final { return join_.index_ports(); }
std::size_t num_calls() const final { return calls_.load(); }

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
join_or_none_t<N> join_;
tbb::flow::function_node<messages_t<N>> observer_;
hashes_t cached_hashes_;
std::atomic<std::size_t> calls_;
};
}
Expand Down
6 changes: 2 additions & 4 deletions phlex/core/declared_output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ namespace phlex::experimental {
detail::output_function_t&& ft) :
consumer{std::move(name), std::move(predicates)},
node_{g, concurrency, [this, f = std::move(ft)](message const& msg) -> tbb::flow::continue_msg {
if (not msg.store->is_flush()) {
f(*msg.store);
++calls_;
}
f(*msg.store);
++calls_;
return {};
}}
{
Expand Down
Loading
Loading