Skip to content

Commit

Permalink
feat: pass queue-size to input functor
Browse files Browse the repository at this point in the history
  • Loading branch information
strasdat committed Aug 20, 2024
1 parent 793079e commit f6252e2
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
16 changes: 16 additions & 0 deletions cpp/farm_ng/core/pipeline/component.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ Input<TArg>::Input(
std::string const &name,
std::function<void(TArg)> const &f,
InputConfig const &config)
: context_strand_(FARM_UNWRAP(component).getContextStrand()),
uri_(FARM_UNWRAP(component).uri()),
config_(config),
function_([f](TArg arg, size_t){f(arg);}),
count_(0) {
uri_.query = FARM_FORMAT("in={}", name);
}

/// Templated ``Input`` constructor
template <class TArg>
Input<TArg>::Input(
Component const *component,
std::function<void(TArg, size_t)> const &f,
std::string const &name,
InputConfig const &config)
: context_strand_(FARM_UNWRAP(component).getContextStrand()),
uri_(FARM_UNWRAP(component).uri()),
config_(config),
Expand All @@ -86,4 +101,5 @@ Input<TArg>::Input(
uri_.query = FARM_FORMAT("in={}", name);
}


} // namespace farm_ng
25 changes: 19 additions & 6 deletions cpp/farm_ng/core/pipeline/input.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ class Input {
std::function<void(TArg)> const& f,
InputConfig const& config = InputConfig());

explicit Input(
Component const* component,
std::function<void(TArg, size_t)> const& f,
std::string const& name,
InputConfig const& config = InputConfig());

// static Input withQueueSize(
// Component const* component,
// std::string const& name,
// std::function<void(TArg, size_t)> const& f,
// InputConfig const& config = InputConfig());

/// Default destructor
~Input() {}

Expand Down Expand Up @@ -89,18 +101,19 @@ class Input {
// NOTE: seems that `io_context::strand::post` it's deprecated and need
// to be use `boost::asio::post` instead. See:
// https://www.boost.org/doc/libs/1_78_0/doc/html/boost_asio/reference/io_context__strand/post.html
context_strand_.getAsioStrand().post([this, value = std::move(value)] {
this->strandedSlot(std::move(value));
});
context_strand_.getAsioStrand().post(
[this, count, value = std::move(value)] {
this->strandedSlot(std::move(value), count);
});
return true;
}

// strandedSlot is called on the strand associated with ContextStrand
// context_strand_, and it can be a generic place to track things like
// function execution time, and manage the queue length.
void strandedSlot(TArg value) {
void strandedSlot(TArg value, size_t queue_size) {
// begin timing
function_(std::move(value));
function_(std::move(value), queue_size);
// end timing
// e.g. context_strand_.report_timing(input_name_, timing)
count_--;
Expand All @@ -113,7 +126,7 @@ class Input {
/// The configuration structure for the class instance.
InputConfig config_;
/// The callback function definition.
std::function<void(TArg)> function_;
std::function<void(TArg, size_t)> function_;
/// The list contained connections between other functions.
std::vector<std::shared_ptr<boost::signals2::scoped_connection>> connections_;
/// A counter tracking the number of calls made to the function.
Expand Down

0 comments on commit f6252e2

Please sign in to comment.