The following (incomplete) example shows a simple way to utilize the pipeline
using from()
:
auto grep_error = std::bind(grep, "Error.*", _1, _2); boost::pipeline::thread_pool pool; std::vector<std::string> output; auto execution = (boost::pipeline::from(input) | trim | grep_error | [] (const std::string& input) { return "-> " + input; } | output ).run(pool);
This will feed the contents of input
to the pipeline which trims whitespace from each items, selects those which
starts with "Error", prepends them by an arrow mark them puts them
into output
. Please find
the whole program in the example/hello.cpp file. This snippet
is intended to give a quick glimpse of the interface. The Tutorial
section walks through building a pipeline.
This section explains various parts of a pipeline and shows ways how to build them. The example snippets below assume the following include and namespace alias:
#include <boost/pipeline.hpp> namespace ppl = boost::pipeline;
Every complete pipeline requires a source of input. Such source can be a
container, a range, a queue
or a generator. A container
or a range is an input of a fixed size: once it's assigned, it cannot (or
shouldn't) be changed later. The queue
is specific to this library;
it provides concurrent read and write, therefore items can be added to it
even after the pipeline is launched. The generator is a function (1) or a
callable object with (2):
R generator(ppl::queue_back<T> downstream); // (1) R operator()(ppl::queue_back<T> downstream); // (2)
The generator function feeds the underlying queue through queue_back
and returns when there are no more items to be processed. All three types
of input can be turned into the beginning of a pipeline using from()
:
ppl::from(container) // takes begin and end ppl::from(container.begin(), container.end()) // same as above ppl::from(queue) // reference is taken ppl::from(generator) // generator gets copied
Please refer to the API
documentation to learn more about using from()
. Transformations can be connected later
to the returned segment to form a pipeline:
Transformations are the meat of a pipeline. Input segments created by from()
are not much use by themselves. A transformation is a callable (a function
pointer, a function
object,
a bind expression, a functor or a lambda) which receives one or more input
items on each call and creates one or more output items in turn. The signature
of such transformations include:
Output one_to_one(const Input& input); R one_to_n (const Input& input, queue_back<Output>& downstream); Output n_to_one (queue_front<Input>& upstream); R n_to_m (queue_front<Input>& upstream, queue_back<Output>& downstream);
Neither of the above cases can Output
be void
. Please refer to the
Transformations
section to read more about them.
Applying transformations on the input is done by the |
operator. Each use of the operator creates a new segment and therefore it's
chainable:
ppl::from(input) | mod_seven | even_only | add_two;
See example/tutorial.cpp for more. After all the required transformations are connected, to output must be described:
The pipeline is incomplete until the output is specified. The output can
be a container, a queue
, or a consumer. A consumer
is a callable receiving a queue_front
which can be pulled for items until there is no more. Usually, the library
can recognize when an output segment is specified, however, there is no difference
between consumers and n-to-one transformations. For such cases, use to()
:
void consumer_a(queue_front<int> upstream); // returns void int consumer_b(queue_front<int> upstream); // returns int, might be mistaken for an n-to-one transformation segment | container; // container is recognized as output, taken by reference segment | queue; // same as above segment | consumer_a // definitely a consumer, gets copied segment | ppl::to(consumer_b) // `to()` must be used to be recognized as consumer
Now our pipeline is assembled and ready to run:
Until now we only have our pipeline assembled but not ran. To run a pipeline,
its associated run()
method must be called with a thread
pool
. This method schedules the transformations on the pool and
returns an execution
object. This handle can be queried or waited on:
ppl::thread_pool pool{4}; // add 4 threads to the pool ppl::execution exec = (ppl::from(input) | mod_seven | even_only | add_two | output).run(pool); exec.wait(); // blocks until the pipeline is finished bool done = exec.is_done(); // done == true
Please take a look at the Scheduling section to learn how to size a thread pool to avoid deadlocks.