An introduction to guarded pipes

Pipes are a very simple but powerful abstraction which can be used to implement stream-based IO, in a very similar fashion to iteratees and friends, or conduits. In this post, I introduce guarded pipes: a slight generalization of pipes which makes it possible to implement a larger class of combinators.

{-# LANGUAGE NoMonomorphismRestriction #-}
module Blog.Pipes.Guarded where

import Control.Category
import Control.Monad.Free
import Control.Monad.Identity
import Data.Maybe
import Data.Void
import Prelude hiding (id, (.), until, filter)

The idea behind pipes is straightfoward: fix a base monad m, then construct the free monad over a specific PipeF functor:

data PipeF a b m x = M (m x)
                   | Yield b x
                   | Await (Maybe a -> x)

instance Monad m => Functor (PipeF a b m) where
  fmap f (M m) = M $ liftM f m
  fmap f (Yield x c) = Yield x (f c)
  fmap f (Await k) = Await (f . k)

type Pipe a b m r = Free (PipeF a b m) r

Generally speaking, a free monad can be thought of as an embedded language in CPS style: every summand of the base functor (PipeF in this case), is a primitive operation, while the x parameter represents the continuation at each step.

In the case of pipes, M corresponds to an effect in the base monad, Yield produces an output value, and Await blocks until it receives an input value, then passes it to its continuation. You can see that the Await continuation takes a Maybe a type: this is the only thing that distinguishes guarded pipes from regular pipes (as implemented in the pipes package on Hackage). The idea is that Await will receive Nothing whenever the pipe runs out of input values. That will give it a chance to do some cleanup or yield extra outputs. Any additional Await after that point will terminate the pipe immediately.

We can write a simplistic list-based (strict) interpreter formalizing the semantics I just described:

evalPipe :: Monad m => Pipe a b m r -> [a] -> m [b]
evalPipe p xs = go False xs [] p

The boolean parameter is going to be set to True as soon as we execute an Await with an empty input list.

A Pure value means that the pipe has terminated spontaneously, so we return the accumulated output list:

  where
    go _ _ ys (Pure r) = return (reverse ys)

Execute inner monadic effects:

    go t xs ys (Free (M m)) = m >>= go t xs ys

Save yielded values into the accumulator:

    go t xs ys (Free (Yield y c)) = go t xs (y : ys) c

If we still have values in the input list, feed one to the continuation of an Await statement.

    go t (x:xs) ys (Free (Await k)) = go t xs ys $ k (Just x)

If we run out of inputs, pass Nothing to the Await continuation…

    go False [] ys (Free (Await k)) = go True [] ys (k Nothing)

… but only the first time. If the pipe awaits again, terminate it.

    go True [] ys (Free (Await _)) = return (reverse ys)

To simplify the implementation of actual pipes, we define the following basic combinators:

tryAwait :: Monad m => Pipe a b m (Maybe a)
tryAwait = wrap $ Await return

yield :: Monad m => b -> Pipe a b m ()
yield x = wrap $ Yield x (return ())

lift :: Monad m => m r -> Pipe a b m r
lift = wrap . M . liftM return

and a couple of secondary combinators, very useful in practice. First, a pipe that consumes all input and never produces output:

discard :: Monad m => Pipe a b m r
discard = forever tryAwait

then a simplified await primitive, that dies as soon as we stop feeding values to it.

await :: Monad m => Pipe a b m a
await = tryAwait >>= maybe discard return

now we can write a very simple pipe that sums consecutive pairs of numbers:

sumPairs :: (Monad m, Num a) => Pipe a a m ()
sumPairs = forever $ do
  x <- await
  y <- await
  yield $ x + y

we get:

ex1 :: [Int]
ex1 = runIdentity $ evalPipe sumPairs [1,2,3,4]
{- ex1 == [3, 7] -}

Composing pipes

The usefulness of pipes, however, is not limited to being able to express list transformations as monadic computations using the await and yield primitives. In fact, it turns out that two pipes can be composed sequentially to create a new pipe.

infixl 9 >+>
(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
(>+>) = go False False
  where

When implementing evalPipe, we needed a boolean parameter to signal upstream input exhaustion. This time, we need two boolean parameters, one for the input of the upstream pipe, and one for its output, i.e. the input of the downstream pipe. First, if downstream does anything other than waiting, we just let the composite pipe execute the same action:

    go _ _ p1 (Pure r) = return r
    go t1 t2 p1 (Free (Yield x c)) = yield x >> go t1 t2 p1 c
    go t1 t2 p1 (Free (M m)) = lift m >>= \p2 -> go t1 t2 p1 p2

then, if upstream is yielding and downstream is waiting, we can feed the yielded value to the downstream pipe and continue from there:

    go t1 t2 (Free (Yield x c)) (Free (Await k)) =
      go t1 t2 c $ k (Just x)

if downstream is waiting and upstream is running a monadic computation, just let upstream run and keep downstream waiting:

    go t1 t2 (Free (M m)) p2@(Free (Await _)) =
      lift m >>= \p1 -> go t1 t2 p1 p2

if upstream terminates while downstream is waiting, finalize downstream:

    go t1 False p1@(Pure _) (Free (Await k)) =
      go t1 True p1 (k Nothing)

but if downstream awaits again, terminate the whole composite pipe:

    go _ True (Pure r) (Free (Await _)) = return r

now, if both pipes are waiting, we keep the second pipe waiting and we feed whatever input we get to the first pipe. If the input is Nothing, we set the first boolean flag, so that next time the first pipe awaits, we can finalize the downstream pipe.

    go False t2 (Free (Await k)) p2@(Free (Await _)) =
      tryAwait >>= \x -> go (isNothing x) t2 (k x) p2
    go True False p1@(Free (Await _)) (Free (Await k)) =
      go True True p1 (k Nothing)
    go True True p1@(Free (Await _)) p2@(Free (Await _)) =
      tryAwait >>= \_ -> {- unreachable -} go True True p1 p2

This composition can be shown to be associative (in a rather strong sense), with identity given by:

idP :: Monad m => Pipe a a m r
idP = forever $ await >>= yield

So we can define a Category instance:

newtype PipeC m r a b = PipeC { unPipeC :: Pipe a b m r }

instance Monad m => Category (PipeC m r) where
  id = PipeC idP
  (PipeC p2) . (PipeC p1) = PipeC $ p1 >+> p2

Running pipes

A runnable pipe, also called Pipeline, is a pipe that doesn’t yield any value and doesn’t wait for any input. We can formalize this in the types as follows:

type Pipeline m r = Pipe () Void m r

Disregarding bottom, calling await on such a pipe does not return any useful value, and yielding is impossible. Another way to think of Pipeline is as an arrow (in PipeC) from the terminal object to the initial object of Hask1.

Running a pipeline is straightforward:

runPipe :: Monad m => Pipeline m r -> m r
runPipe (Pure r) = return r
runPipe (Free (M m)) = m >>= runPipe
runPipe (Free (Await k)) = runPipe $ k (Just ())
runPipe (Free (Yield x c)) = absurd x

where the impossibility of the last case is guaranteed by the types, unless of course the pipe introduced a bottom value at some point.

The three primitive operations tryAwait, yield and lift, together with pipe composition and the runPipe function above, are basically all we need to define most pipes and pipe combinators. For example, the simple pipe interpreter evalPipe can be easily rewritten in terms of these primitives:

evalPipe' :: Monad m => Pipe a b m r -> [a] -> m [b]
evalPipe' p xs = runPipe $
  (mapM_ yield xs >> return []) >+>
  (p >> discard) >+>
  collect id
  where
    collect xs =
      tryAwait >>= maybe (return $ xs [])
                         (\x -> collect (xs . (x:)))

Note that we use the discard pipe to turn the original pipe into an infinite one, so that the final return value will be taken from the final pipe.

Extra combinators

The rich structure on pipes (category and monad) makes it really easy to define new higher-level combinators. For example, here are implementations of some of the combinators in Data.Conduit.List, translated to pipes:

sourceList = mapM_ yield
sourceNull = return ()
fold f z = go z
  where
    go x = tryAwait >>= maybe (return x) (go . f x)
consume = fold (\xs x -> xs . (x:)) id >>= \xs -> return (xs [])
sinkNull = discard
take n = (isolate n >> return []) >+> consume
drop n = replicateM n await >> idP
pipe f = forever $ await >>= yield . f -- called map in conduit
concatMap f = forever $ await >>= mapM_ yield . f
until p = go
  where
    go = await >>= \x -> if p x then return () else yield x >> go
groupBy (~=) = p >+>
  forever (until isNothing >+>
           pipe fromJust >+>
           (consume >>= yield))
  where 
    -- the pipe p yields Nothing whenever the current item y
    -- and the previous one x do not satisfy x ~= y, and behaves
    -- like idP otherwise
    p = await >>= \x -> yield (Just x) >> go x
    go x = do
      y <- await
      unless (x ~= y) $ yield Nothing
      yield $ Just y
      go y
isolate n = replicateM_ n $ await >>= yield
filter p = forever $ until (not . p)

To work with the equivalent of sinks, it is useful to define a source to sink composition operator:

infixr 2 $$
($$) :: Monad m => Pipe () a m r' -> Pipe a Void m r -> m (Maybe r)
p1 $$ p2 = runPipe $ (p1 >> return Nothing) >+> liftM Just p2

which ignores the source return type, and just returns the sink return value, or Nothing if the source happens to terminate first. So we have, for example:

ex2 :: Maybe [Int]
ex2 = runIdentity $ sourceList [1..10] >+> isolate 4 $$ consume
{- ex2 == Just [1,2,3,4] -}

ex3 :: Maybe [Int]
ex3 = runIdentity $ sourceList [1..10] $$ discard
{- ex3 == Nothing -}

ex4 :: Maybe Int
ex4 = runIdentity $ sourceList [1,1,2,2,2,3,4,4]
                >+> groupBy (==)
                >+> pipe head
                 $$ fold (+) 0
{- ex4 == Just 10 -}

ex5 :: Maybe [Int]
ex5 = runIdentity $ sourceList [1..10]
                >+> filter (\x -> x `mod` 3 == 0)
                 $$ consume
{- ex5 == Just [3, 6, 9] -}

Pipes in practice

You can find an implementation of guarded pipes in my fork of pipes. There is also a pipes-extra repository where you can find some pipes to deal with chunked ByteStreams and utilities to convert conduits to pipes.

I hope to be able to merge this into the original pipes package once the guarded pipe concept has proven its worth. Without the tryAwait primitive, combinators like fold and consume cannot be implemented, nor even a simple stateful pipe like one to split a chunked input into lines. So I think there are enough benefits to justify a little extra complexity in the definition of composition.


  1. In reality, Hask doesn’t have an initial object, and the terminal object is actually Void, because of non-strict semantics.

Comments

Loading comments...