C++ Pipelines

ISO/IEC JTC1 SC22 WG21 N3534 = 2013-03-15

Adam Berkan, aberkan@google.com, adam.berkan@gmail.com
Alasdair Mackintosh, alasdair@google.com, alasdair.mackintosh@gmail.com

Introduction
Problem
Solution
Proposal - Pipelines in C++
Parallel Pipelines
Abstractions
    Pipeline Segments
    Pipeline Plans
    Pipeline Executions
Interface Details
    Pipeline Segments
    Pipeline Plans
    Running in Parallel
    Pipeline Executions
Additional Features

Introduction - Multithreaded Pipelines in the Unix Shell

One of the simplest ways to write a multi-threaded application is to use the Unix shell:


# Get all error messages in the log, filter out the test account, and format them:
cat log.txt | grep '^Error:' | grep -v 'test@example.com' |
  sed 's/^Error:.*Message: //' > output.txt
This is a multi-threaded pipeline. Each of the operations is running independently in its own process, and there are no potential deadlocks, data races or undefined behaviours.

Although not all multi-threaded operations can be performed in this way, we believe that the programming approach used here (simple tasks, each performing a self-contained operation in its own thread) can be used produce robust and reliable applications.

Problem

In C++, trying to write multi-threaded applications by hand, using mutexes and threading primitives, is difficult and error prone, and many such tasks would be simpler if the application logic could be separated out into small independent sections. However, in C++ there is no straightforward and threadsafe way of joining such tasks together.

Solution

We propose a C++ pipeline library that allows application programmers to combine simple data transformations into a complete multithreaded data-processing pipeline. Individual transformation functions are isolated from each other, and may be run in parallel. We use a pipe syntax that should be familiar to users of Unix or Microsoft shells.

(pipeline::from(input_queue) |
  bind(grep, "^Error") |
  bind(vgrep, "test@example.com") |
  bind(sed, "'s/^Error:.*Message: //") |
  output_queue).run(&threadpool);
Although not universally applicable, we believe that this approach will simplify the writing of certain classes of data-processing applications.

Proposal - Pipelines in C++

A pipeline is made up of functions that read data from an input queue, transform it in some way, and write it to an output queue. (The pipeline framework makes use of the proposed C++ queue and executor libraries. See "C++ Concurrent Queues, ISO/IEC JTC1 SC22 WG21 N3353 12-0043 - 2012-01-14" and "A preliminary proposal for work executors, ISO/IEC JTC1 SC22 WG21 N3378=12-0068")

The overloaded pipe operators ('|') that separates stages in the pipeline represent concurrent queues that buffer the outputs from one stage, and pass it to the input for the next stage. Consider the example shown in the previous section, where we have a 'grep' function that reads a sequence of strings and outputs those values that match a filter. If we wrote the 'grep' function explicitly, using input and output queues, it would be of the form:


void grep_error(queue_back<string> in, queue_front<string> out) {
  string item;
  while (in.wait_pop(item) == queue_op_status::success) {
    if (re::match("^Error", item)) {
      out.push(item);
    }
  }
}

The 'vgrep' function would be similar, while the 'sed' function would apply a transform to each input.


void sed_message(queue_back<string> in, queue_front<string> out) {
  string item;
  while (in.wait_pop(item) == queue_op_status::success) {
    re::apply("s/^Error:.*Message: //", item);
    out.push(item);
  }
}
Given these functions, we can then run the pipeline as follows:

(pipeline::from(input_queue) | grep_error |
  vgrep_test |  sed_message | output_queue).run(&thread_pool);

General purpose functions would be better, which we can do concisely by binding additional parameters.

We would then invoke the pipeline with specific values:


void grep(const string& re, queue_back<string> in,
queue_front<string> out) {
  string item;
  while (in.wait_pop(item) == queue_op_status::success) {
    if (re::match(re, item)) {
      out.push(item);
    }
  }
}
void vgrep(const string& re, queue_back<string> in,
queue_front<string> out) {
  ...
}
void sed(const string& re, queue_back<string> in,
queue_front<string> out) {
  string item;
  while (in.wait_pop(item) == queue_op_status::success) {
    re::apply(re, item);
    out.push(item);
  }
}

To avoid boilerplate code for processing input and output queues, we would prefer to write just the data-transformation logic. When each input is processed into a single output, we can simply write:


string sed(const string& re, string item) {
  re::apply(re, item);
  return item;
}

A more common case is to have one input that maps to a variable number of outputs. In that case we will have a single input argument, still use an output queue.


void grep(const string& re, string item, queue_front<string> out) {
  if (re::match(re, item)) {
    out.push(item);
  }
}

In each case the pipeline framework will wrap these functions in the appropriate queue-handling logic.

Note that although the above examples use strings as input and output types, the actual implementation is fully templated, and will work with any queueable type.

Beyond Shell Scripts - Parallel Pipelines

The previous example uses a single file as input. Suppose we are reading from multiple files? We can add an extra stage to the pipeline:

void read_file(const string& filename, queue_front<string> out) {
  File file(filename);
  for (string line : file.readLines()) {
    out.push(line);
  }
}

queue<string> filenames = ...

pipeline::execution task(
  pipeline::from(filenames) |
  read_file | grep_fn | vgrep_fn | sed_fn |
  output_queue, &thread_pool);

In this case, the read_file function will process each file sequentially. However, if these files were stored on multiple devices it might be faster to read them in parallel.

pipeline::execution task(
  pipeline::from(filenames) |
  pipeline::parallel(read_file | grep_fn | vgrep_fn | sed_fn, 8) |
  output_queue).run(&thread_pool);
This will run 8 separate pipelines in parallel. Each pipeline will read the next file from the input queue, open it, process it, and write the result into the same output queue. Alternatively, if we only want to run the first part of the process in parallel, we can write:

pipeline::execution task(
  pipeline::from(filenames) |
  pipeline::parallel(read_file | grep_fn, 8) | vgrep_fn | sed_fn |
  output_queue).run(&thread_pool);
This will run 8 parallel segments that read input files and perform the initial 'grep' filtering, but will then write into a single thread running the vgrep function.
Note that each parallel segment runs independently, and will have independent queues within each stage.

Pipeline Abstractions

The pipeline framework has several classes that represent important abstractions:

Pipeline Segments

A segment represents a series of connected operations. It is analogous to a shell function containing a series of filter commands:


function f1() {
  grep '^Error:' | grep -v "test@example.com" | sed 's/^Error:.*Message:'
}
The C++ equivalent would be:

pipeline::segment<string, string> s1 = pipeline::from(grep_fn) | vgrep_fn | sed_fn;
Segments can themselves be joined together to create longer segments. In a shell:

function f3() {
  f1 | f2
}

In C++:


pipeline::segment<string, int> s2 = ...
pipeline::segment<string, int> s3 = s1 | s2;

Pipeline Plans

The script in the above example cannot be run in isolation: it needs an input and an output. Unless we plan to use stdin and stdout, we would normally invoke the function with input and output files:


function plan() {
  cat log.txt | f1 > output.txt
}
In C++ we use queues as input and output. (There is no explicit stdin/stdout).

pipeline::plan plan = pipeline::from(input_queue) | s1 | output_queue;
In addition to queues, we can also use producer and consumer functions as input and output:

void read_file(const string& filename, queue_front<string> out) {
  File file(filename);
  for (string line : file.readLines()) {
    out.push(line);
  }
}
pipeline::plan plan = pipeline::produce(bind(read_file, "log.txt") | s1 | output_queue;

Pipeline Executions

A pipeline plan can be executed by providing it a threadpool in which to run. This is analogous to executing the plan function at the shell prompt, and allowing it to run in the background:


$> plan &
[1] 2263
In C++:

pipeline::execution execn = plan.run(&threadpool);
if (execn.is_done()) ...
execn.wait();
Creating a pipeline execution will cause the pipeline plan to start running in the given threadpool. It will continue to run until the input queue is closed, and will then close the output queue when all input has been processed.

Interface Details

Pipeline Segments

A segment is created with pipeline::make(). The base function must convert an input to an output type, either directly or via a queue. Input queues and functions that are producers can be made into pipelines with pipeline::from() and consumers and output queues use pipeline::to().  The following functions all convert integer input to string output:


string direct_fn(int input);
string queue_in_fn(queue_back<int> input_queue);
void queue_out_fn(int input, queue_front<string> output_queue);
void queue_in_out_fn(queue_back<int> input_queue, queue_front<string> output_queue);

// Segment that converts ints to strings
pipeline::segment<int, string> s1;

// Can create the same segment type from any suitable function
s1 = pipeline::make(direct_fn);
s1 = pipeline::make(queue_in_fn);
s1 = pipeline::make(queue_out_fn);
s1 = pipeline::make(queue_in_out_fn);

Segments can be piped together. As before, any suitable function can be used

Name create_name(string input);
void create_names(string input, queue_front<Name> output_queue););

// Segment that converts ints to Names. Can be created by piping "s1"
// into any suitable function
pipeline::segment<int, Name> s2;

s2 = s1 | create_name;
s2 = s1 | create_names;

The 'make()' function is used to create the correct segment type. Once this has been created, subsequent elements can be added using the overloaded '|' operator, which will return the corerct type for subsequent pipeline operations. In theory we could write:

s2 = make(direct_fn) | make(create_name);
but it is simpler to write:

s2 = make(direct_fn) | create_name;

Pipeline Plans

A plan is created by terminating a segment at either end. The start terminator may either be a queue, or a function that writes to a queue_front. The end terminator may be a queue, a function that reads from a queue_back, or a function that repeatedly reads inputs. Conceptually a plan may be represented as:

(input_queue's queue_back) -> segment -> (queue_front of output_queue)

Some examples would be:

// Sources
void write_ints(queue_front<int> output_queue) {
  for(...) {
    output_queue.push(n);
  }
}

queue<int> source_queue;

// Sinks
void process_name_queue(queue_back<Name> input_queue) {
}

void process_name(Name input) {
}

queue<Name> name_output_queue;

// Possible plans using these sources and sinks, plus the "s2" segment defined above.
// (Three out of six combinations shown.)
pipeline::plan p1 = pipeline::from(source_queue) | s2 | process_name;
pipeline::plan p2 = pipeline::from(write_ints) | s2 | process_name_queue;
pipeline::plan p3 = pipeline::from(source_queue) | s2 | name_output_queue;

Alternatively, if a predefined segment is not available, the entire plan can be created as follows:

// Two out of N combinations shown
pipeline::plan p1 = pipeline::from(source_queue) | queue_in_fn | make_names | process_name_queue;
pipeline::plan p2 = pipeline::from(source_queue) | direct_fn | make_name | name_output_queue;
Note that it is also possible to create a segment that is terminated at either the start or the end, and then combine that partially terminated segment into a full plan.
pipeline::segment<pipeline::terminated, Name> s3 = pipeline::from(source_queue) | s2;
pipeline::plan p3 = s3 | process_name_queue;
Plans are always created using the 'pipeline::from()' function. This is necessary to create the correct pipeline type. Additional segments can then be added by using then overloaded '|' operator, which will return objects of the correct type for the next stage in the pipeline.

Running in Parallel

Any segment may be run in parallel, by wrapping it in a parallel() function.


segment<int, string> s1;

// Run s1 in parallel, into a single make_names call
segment<int, Name> parallel_1 = parallel(s1, n_threads) | make_names;

// Run s1 into make_names, both in parallel
segment<int, Name> parallel_2 = parallel(s1 | make_names, n_threads);

Running in parallel will run each function (or chain of functions) in a parallel thread. The n_threads argument determines the number of threads used.

Pipeline Executions

Creating an execution causes the plan to start running immediately, just as creating a new std::thread causes the thread to begin executing.

pipeline::execution exec = p1.run(&thread_pool);
The caller can block until the execution finishes by calling wait()
pipeline::execution exec = p1.run(thread_pool);
exec.wait();

Implementation

The open-source implementation is freely available at http://code.google.com/p/google-concurrency-library/source/browse/include/pipeline.h.

Additional Features

We are considering the following additional features:

Iterators as inputs and outputs

As well as queues for inputs and outputs we could also support iterators:

vector<string> inputs = ...
pipeline::from(input.begin(), input.end()) | grep_fn | ...

Iterators as parameters to functions

Similarly, we could support iterators as argument to pipeline functions:

typedef iterator<input_iterator_tag, string> Input;
typedef iterator<output_iterator_tag, string> Output;
void grep(const string& re, Input first, Input last, Output out)
{
  while (first != last) {
    if re::match(re, *first) {
      *result = *first;
    }
    ++result; ++first;
  }
}

Streams as inputs and outputs

Standard iostreams could be used as input and output.

Internal optimizations. Threading and workstealing

The current pipeline framework uses one thread for each stage of the pipeline. To limit the use of resources, it should be possible to run with fewer threads, using work-stealing or work-sharing techniques.