
Quick Start


The following (incomplete) example shows a simple way to utilize the pipeline using from():

auto grep_error = std::bind(grep, "Error.*", _1, _2);

boost::pipeline::thread_pool pool;
std::vector<std::string> output;

auto execution =
  | trim
  | grep_error
  | [] (const std::string& input) { return "-> " + input; }
  | output

This will feed the contents of input to the pipeline which trims whitespace from each items, selects those which starts with "Error", prepends them by an arrow mark them puts them into output. Please find the whole program in the example/hello.cpp file. This snippet is intended to give a quick glimpse of the interface. The Tutorial section walks through building a pipeline.

This section explains various parts of a pipeline and shows ways how to build them. The example snippets below assume the following include and namespace alias:

#include <boost/pipeline.hpp>
namespace ppl = boost::pipeline;


Every complete pipeline requires a source of input. Such source can be a container, a range, a queue or a generator. A container or a range is an input of a fixed size: once it's assigned, it cannot (or shouldn't) be changed later. The queue is specific to this library; it provides concurrent read and write, therefore items can be added to it even after the pipeline is launched. The generator is a function (1) or a callable object with (2):

R generator(ppl::queue_back<T> downstream); // (1)
R operator()(ppl::queue_back<T> downstream); // (2)

The generator function feeds the underlying queue through queue_back and returns when there are no more items to be processed. All three types of input can be turned into the beginning of a pipeline using from():

ppl::from(container) // takes begin and end
ppl::from(container.begin(), container.end()) // same as above
ppl::from(queue) // reference is taken
ppl::from(generator) // generator gets copied

Please refer to the API documentation to learn more about using from(). Transformations can be connected later to the returned segment to form a pipeline:

Connecting transformations

Transformations are the meat of a pipeline. Input segments created by from() are not much use by themselves. A transformation is a callable (a function pointer, a function object, a bind expression, a functor or a lambda) which receives one or more input items on each call and creates one or more output items in turn. The signature of such transformations include:

Output one_to_one(const Input& input);
R      one_to_n  (const Input& input,           queue_back<Output>& downstream);
Output n_to_one  (queue_front<Input>& upstream);
R      n_to_m    (queue_front<Input>& upstream, queue_back<Output>& downstream);

Neither of the above cases can Output be void. Please refer to the Transformations section to read more about them.

Applying transformations on the input is done by the | operator. Each use of the operator creates a new segment and therefore it's chainable:

ppl::from(input) | mod_seven | even_only | add_two;

See example/tutorial.cpp for more. After all the required transformations are connected, to output must be described:

Specifying pipeline output

The pipeline is incomplete until the output is specified. The output can be a container, a queue, or a consumer. A consumer is a callable receiving a queue_front which can be pulled for items until there is no more. Usually, the library can recognize when an output segment is specified, however, there is no difference between consumers and n-to-one transformations. For such cases, use to():

void consumer_a(queue_front<int> upstream); // returns void
int  consumer_b(queue_front<int> upstream); // returns int, might be mistaken for an n-to-one transformation

segment | container; // container is recognized as output, taken by reference
segment | queue;     // same as above
segment | consumer_a // definitely a consumer, gets copied
segment | ppl::to(consumer_b) // `to()` must be used to be recognized as consumer

Now our pipeline is assembled and ready to run:

Running the pipeline

Until now we only have our pipeline assembled but not ran. To run a pipeline, its associated run() method must be called with a thread pool. This method schedules the transformations on the pool and returns an execution object. This handle can be queried or waited on:

ppl::thread_pool pool{4}; // add 4 threads to the pool
ppl::execution exec = (ppl::from(input) | mod_seven | even_only | add_two | output).run(pool);
exec.wait(); // blocks until the pipeline is finished
bool done = exec.is_done(); // done == true

Please take a look at the Scheduling section to learn how to size a thread pool to avoid deadlocks.
