Paolo Capriotti's blog

Functional programming and more

Pipes 2.0 vs pipes-core

With the release of pipes 2.0 by Gabriel Gonzalez, I feel it’s time to address the question of whether my fork will eventually be merged or not.

The short answer is no, I will continue to maintain my separate incarnation pipes-core. In this post, I will discuss the reasoning behind this decision, and hopefully explain the various trade-offs that the two libraries make.

The issue with termination

pipes 1.0 can be quite accurately described as “composable monadic stream processors”. “Composable” alludes to horizontal composition (i.e. the Category instance), while “monadic” refers to vertical composition.

The existence of a Monad instance has a number of consequences, the most important being the fact that pipes can carry a “return value”, and, in particular, they can terminate.

The fact that pipes can terminate poses the greatest challenge when reasoning about the properties of (horizontal) composition, but termination is also one of the nicest features of pipes, so we want to deal with this difficulty appropriately.

Termination implies that any pipe has to deal somehow with the fact that its upstream pipe can exit before yielding a value, which basically means that an await can fail.

Gabriel’s pipes address this issue by simply “propagating termination downstream”. A pipe awaiting on a terminated pipe is forcibly terminated itself, and the upstream return value is returned.

My guarded pipes idea (later integrated into pipes-core), proposes a new primitive

tryAwait :: Pipe a b m (Maybe a)

that returns Nothing when upstream terminates before providing a value.

Using tryAwait, a pipe can then handle a failure caused by termination, and either return a value, or use the upstream value (the latter can be accomplished by simply awaiting again).

Exception handling

Once you realize that pipes should be able to handle failure on await, it becomes very natural to extend the idea to other kinds of failure.

That’s exactly the rationale behind pipes-core. It introduces slightly more involved primitives that take into account the fact that actions in the base monad, as well as pipes themselves, can throw an exception at any time.

One very interesting consequence of built-in exception handling is that the “guarded pipes” concept can be integrated seamlessly by introducing a special BrokenPipe exception.

The exception handling implementation in pipes-core works in any monad, and deals with asynchronous exceptions correctly. Of course, actual exceptions thrown from Haskell code can only be caught when the base monad is IO.

What about finalization?

Since all the finalization primitives in Control.Exception are implemented on top of exception handling primitives like catch and mask, I initially believed that finalization would follow automatically from exception handling capabilities in pipes.

Unfortunately, there is a fundamental operational difference between IO and Pipe, which makes exception handling alone insufficient to guarantee finalization of resources.

The problem is that some of the pipes in a pipeline are not guaranteed to be executed at all. In fact, a pipe only plays a role in pipeline execution if its downstream pipe awaits at some point (or if it is the last one).

The same applies to “portions” of pipes, so a pipe can execute partially, and then be completely forgotten, even if no exceptional condition occurs.

After a number of failed attempts (including the broken 0.0.1 release of pipes-core), I realized that Gabriel’s finalizer passing idea was the right one, and used it to replace my flawed ensure primitive.

Balancing safety and dynamicity

The question remains of how to guarantee that a pipe never awaits again after its upstream terminated.

My solution is dynamic: if upstream terminated because of an exception (that has been handled), just throw the exception again on await; if upstream terminated normally, throw a BrokenPipe exception.

Gabriel’s solution is static: a pipe is not allowed to await again after termination, and the invariant is enforced by the types.

The static solution has obvious advantages, but, on closer inspection, it reveals a number of downsides:

  1. It prevents Pipe from forming a Monad; the solution implemented in pipes 2.0 is to separate the Monad instance from the Category instance, and suggesting that the Monad instance should actually be replaced with an indexed monad.
  2. It doesn’t provide any exception handling mechanism, and doesn’t guarantee that finalizers will be called in case any exception occurs. I imagine that some sort of exception support could be layered on top of the current solution, but I’m guessing it’s not going to be straightforward.
  3. Folds are not compositional. This can be clearly seen in the tutorial, where strict is not defined in terms of toList. With pipes-core, you would simply have:
strict = consume >>= mapM yield
-- note that toList is called consume in pipes-core

What’s next for pipes-core

The current version of pipes-core already provides exception handling and guaranteed finalization in the face of asynchronous exceptions. Things that could be improved in its finalization support are:

  1. Finalization is currently guaranteed, but not always prompt. When an exception handler is provided, upstream finalization gets delayed unnecessarily.
  2. It is not possible to prematurely force finalization. I haven’t yet seen an example where this would be useful, but it would be nice to have it for completeness.

I think I know how these points can be addressed, and hopefully they will make it into the next release.

For future releases, I’d like to focus on performance. Aside from micro-optimizations, I can see two main areas that would benefit from improvements: the Monad instance and the Category instance.

The current monadic bind unfortunately displays a quadratic behavior, since it basically works like a naive list concatenation function. The Codensity transformation should address that.

For the Category instance, it would be interesting to explore whether it is possible to achieve some form of fusion of intermediate data structures, similarly to classic stream fusion for lists.

This is probably going to be more of a challenge, and will likely require some significant restructuring, but the prospective benefits are enormous. There is some research on this topic and an initial attempt I plan to draw ideas from.

My last point is about the absence of an unawait primitive for Pipe. There has been quite a lot of discussion on this topic, but I remain unconvinced that having builtin parsing capabilities is a good thing.

Whenever there is a need to chain unconsumed input, there are a few viable options already:

  1. Return leftover data, and add some manual wiring so that it’s passed to the “next” pipe.
  2. Use PutbackPipe from pipes-extra.
  3. Use an actual parser library and convert the parser to a Pipe (see pipes-attoparsec).

In all the examples I have seen, however, pipes are composable enough that all the special logic to deal with boundaries of chunked streams can be implemented in a single “filter” pipe, and the rest of the pipeline can ignore the issue altogether.