

Open Segments
Type erasure
Item type requirements

This section provides definitions of some commonly used terms throughout the documentation or library code:

  • callable: A function pointer, function object, functor, bind expression or lambda. Basically, every f is a callable if f(Args...) is valid for some Args....
  • transformation: A method producing output items from input items. Precisely, it's a callable which signature matches to one of them described in section Transformations
  • segment: A not necessarily terminated series of connected transformations.
  • plan: A terminated series of connected transformations: its input and output is specified.
  • task: A running segment. Tasks are scheduled on the thread pool.
  • upstream: Generally, the segment connected to the left of a segment. Specifically, it's the input queue of a task.
  • downstream: Generally, the segment connected to the right of a segment. Specifically, it's the output queue of a task.

A transformation is a callable which receives one or more input items and produces one or more output items in turn. Depending on exactly how much items are read and produced on each call, transformations have different signatures:

// not exhaustive list of possible signatures
Output one_to_one(const Input& input);
R      one_to_n  (const Input& input,           queue_back<Output>& downstream);
Output n_to_one  (queue_front<Input>& upstream);
R      n_to_m    (queue_front<Input>& upstream, queue_back<Output>& downstream);

Type legend:

  • Input: type of input items
  • Output: type of output items
  • R: arbitrary return type

Input items can be taken as const& (as above) or by value (if it's possible). Queue handles can be taken by value or by reference; a const& doesn't make much sense.

A transformation can be anything which is callable with any of the above arguments. This includes function pointers, function objects, functors, bind expressions and lambdas. Examples:

std::size_t length(const std::string& input);
std::function<std::size_t(const std::string&)> length_f(length);
auto inverse = [] (int input) { return input * (-1); };
auto add_5 = std::bind(add, 5, _1);

Such transformations can be connected to appropriate segments using the | operator. However, there are one restriction on bind expressions. Assume the following example:

// takes the square root of input if it's greater than `threshold`
void sqrt_if_greater(int threshold, queue_front<int>& upstream, queue_back<float>& downstream);

auto sqrt_if_greater_than_5 = std::bind(sqrt_if_greater, 5, _1, _2);

This case, the library has no way to find out the valid argument list of the operator() template the bind has; it can't guess the value_type of the downstream queue (float). To overcome this limitation, the application must provide a hint:

[Important] Important

If a callable is to be form an n-to-m transformation using bind and the input and output types are different, the type of the return value of the callable must match the output type.

Which means, in the example above, sqrt_if_greater must return float. If the input and output types are the same, the library can get away with it.

You might wonder how an actual transformation looks like; take a look at the example/transformations.cpp file.

Sometimes it's not feasible to assemble a pipeline in one go. One might want to build up smaller chunks and connect them dynamically. It's possible to create incomplete plans (i.e: segments) using make():

auto s2 = ppl::make(length) | output;
auto s1 = ppl::from(lines) | trim;
auto s  = s1 | s2;

Segments created by make, such as s2 in the example are non-terminated segments (include/boost/pipeline/detail/open_segment.hpp) and not much of use on their own. To run them, they must be connected to an appropriate right-terminated but left-open segment (e.g: s1).

It might be necessary to take segments or plans as arguments, therefore a type is required. We always used the auto keyword so far when it came to segments, because the actual type is rather complex and internal to the library. However, type erased handles are provided to be able to refer to them. segment<Input, Ouput> can refer to any segment or plan which takes Inputs as input and produces Outputs. The tag terminated denotes a closed end. A complete pipeline can be referenced as segment<terminated, terminated> or just simply ppl::plan:

ppl::segment<std::string, std::size_t> s2 = ppl::make(length);
ppl::segment<ppl::terminated, std::string> s1 = ppl::from(lines) | trim;
ppl::segment<ppl::terminated, ppl::terminated> s = s1 | s2 | output;

Thanks to the type erased handles, it's easy to create interfaces expecting segments or plans:

void execute_plan(ppl::plan& p)
  ppl::thread_pool pool{1};
  auto exec =;

ppl::segment<ppl::terminated, std::size_t> append_length(const ppl::segment<ppl::terminated, std::string>& s1)
  return s1 | length;

However, using these aliases does not come for free: it involves allocation of dynamic memory and indirection. It's recommended to use the auto keyword instead wherever it's possible.

Items pushed through the pipeline must be transferred from one transformation to the other and stored in queues. This imposes the following constraints on the T type of such items:

  • T must be default constructible
  • T must be movable
  • T must be move assignable

Example of such a type and verification of these requirements can be found in example/item_type_requirements_test.cpp

The point of creating pipelines is to turn concurrency into parallelism by breaking a large process into smaller independent tasks and connecting them through message passing. Task can be then executed by different threads in parallel, thus achieving better throughput and — if done well — even lower latency.

Unfortunately, the scheduling mechanism currently employed by the library is far from great or clever. Below, the pros and cons of this solution is described before the planned improvements.

Scheduling as done currently

It's challenging to schedule tasks because using standard tools only, the library is not able to preempt them, therefore they might block for a long time. Also, even with the application writer involved, it's not convenient yielding from a task to an other in the same thread, e.g: if the downstream queue is full.

Such concerns can be resolved by using coroutines, but coroutine support is not yet implemented. Until then, the library uses an unbounded synchronized queue to connect segments, a top-down approach to schedule tasks and employs blocking instead of yielding.

This is how it works: When the queue is being run, the tasks representing the plan are submitted to the thread pool, starting at the beginning of the pipeline. Order is important, this way the pool will execute the first pool_size segment. Because of the reasons above, execution of a segment will not stop until the input queue is closed. If the input queue is empty, it will block until new item is available or it gets closed; this way no precious cycles are wasted spinning. The queue is unbounded, which means there is no blocking because of a full output queue. Assuming a pipeline of length segment_count, if segment_count > pool_size, the pool_size + 1th segment will run only if the first segment is finished. This might affect latency badly. To avoid this, it's recommended to make the pool at least as large as the pipeline is long.

Planned improvements of scheduling

It's clear the scheme used above is not optimal. The offending reentrancy constraints can be mitigated by introducing coroutines. Running each task in it's own coroutine would make yielding possible: every time the upstream queue is empty or the downstream queue is full even after some spinning (assuming bounded lockfree queues).

Please refer to the Pipeline.Scheduling slides for more information.
