An introduction to guarded pipes
Pipes are a very simple but powerful abstraction which can be used to implement stream-based IO, in a very similar fashion to iteratees and friends, or conduits. In this post, I introduce guarded pipes: a slight generalization of pipes which makes it possible to implement a larger class of combinators.
{-# LANGUAGE NoMonomorphismRestriction #-} module Blog.Pipes.Guarded where import Control.Category import Control.Monad.Free import Control.Monad.Identity import Data.Maybe import Data.Void import Prelude hiding (id, (.), until, filter)
The idea behind pipes is straightfoward: fix a base monad m
, then construct the free monad over a specific PipeF
functor:
data PipeF a b m x = M (m x) | Yield b x | Await (Maybe a -> x) instance Monad m => Functor (PipeF a b m) where fmap f (M m) = M $ liftM f m fmap f (Yield x c) = Yield x (f c) fmap f (Await k) = Await (f . k) type Pipe a b m r = Free (PipeF a b m) r
Generally speaking, a free monad can be thought of as an embedded language in CPS style: every summand of the base functor (PipeF
in this case), is a primitive operation, while the x
parameter represents the continuation at each step.
In the case of pipes, M
corresponds to an effect in the base monad, Yield
produces an output value, and Await
blocks until it receives an input value, then passes it to its continuation. You can see that the Await
continuation takes a Maybe a
type: this is the only thing that distinguishes guarded pipes from regular pipes (as implemented in the pipes package on Hackage). The idea is that Await
will receive Nothing
whenever the pipe runs out of input values. That will give it a chance to do some cleanup or yield extra outputs. Any additional Await
after that point will terminate the pipe immediately.
We can write a simplistic list-based (strict) interpreter formalizing the semantics I just described:
evalPipe :: Monad m => Pipe a b m r -> [a] -> m [b] evalPipe p xs = go False xs [] p
The boolean parameter is going to be set to True
as soon as we execute an Await
with an empty input list.
A Pure
value means that the pipe has terminated spontaneously, so we return the accumulated output list:
where go _ _ ys (Pure r) = return (reverse ys)
Execute inner monadic effects:
go t xs ys (Free (M m)) = m >>= go t xs ys
Save yielded values into the accumulator:
go t xs ys (Free (Yield y c)) = go t xs (y : ys) c
If we still have values in the input list, feed one to the continuation of an Await
statement.
go t (x:xs) ys (Free (Await k)) = go t xs ys $ k (Just x)
If we run out of inputs, pass Nothing
to the Await
continuation…
go False [] ys (Free (Await k)) = go True [] ys (k Nothing)
… but only the first time. If the pipe awaits again, terminate it.
go True [] ys (Free (Await _)) = return (reverse ys)
To simplify the implementation of actual pipes, we define the following basic combinators:
tryAwait :: Monad m => Pipe a b m (Maybe a) tryAwait = wrap $ Await return yield :: Monad m => b -> Pipe a b m () yield x = wrap $ Yield x (return ()) lift :: Monad m => m r -> Pipe a b m r lift = wrap . M . liftM return
and a couple of secondary combinators, very useful in practice. First, a pipe that consumes all input and never produces output:
discard :: Monad m => Pipe a b m r discard = forever tryAwait
then a simplified await
primitive, that dies as soon as we stop feeding values to it.
await :: Monad m => Pipe a b m a await = tryAwait >>= maybe discard return
now we can write a very simple pipe that sums consecutive pairs of numbers:
sumPairs :: (Monad m, Num a) => Pipe a a m () sumPairs = forever $ do x <- await y <- await yield $ x + y
we get:
ex1 :: [Int] ex1 = runIdentity $ evalPipe sumPairs [1,2,3,4] {- ex1 == [3, 7] -}
Composing pipes
The usefulness of pipes, however, is not limited to being able to express list transformations as monadic computations using the await
and yield
primitives. In fact, it turns out that two pipes can be composed sequentially to create a new pipe.
infixl 9 >+> (>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r (>+>) = go False False where
When implementing evalPipe
, we needed a boolean parameter to signal upstream input exhaustion. This time, we need two boolean parameters, one for the input of the upstream pipe, and one for its output, i.e. the input of the downstream pipe. First, if downstream does anything other than waiting, we just let the composite pipe execute the same action:
go _ _ p1 (Pure r) = return r go t1 t2 p1 (Free (Yield x c)) = yield x >> go t1 t2 p1 c go t1 t2 p1 (Free (M m)) = lift m >>= \p2 -> go t1 t2 p1 p2
then, if upstream is yielding and downstream is waiting, we can feed the yielded value to the downstream pipe and continue from there:
go t1 t2 (Free (Yield x c)) (Free (Await k)) = go t1 t2 c $ k (Just x)
if downstream is waiting and upstream is running a monadic computation, just let upstream run and keep downstream waiting:
go t1 t2 (Free (M m)) p2@(Free (Await _)) = lift m >>= \p1 -> go t1 t2 p1 p2
if upstream terminates while downstream is waiting, finalize downstream:
go t1 False p1@(Pure _) (Free (Await k)) = go t1 True p1 (k Nothing)
but if downstream awaits again, terminate the whole composite pipe:
go _ True (Pure r) (Free (Await _)) = return r
now, if both pipes are waiting, we keep the second pipe waiting and we feed whatever input we get to the first pipe. If the input is Nothing
, we set the first boolean flag, so that next time the first pipe awaits, we can finalize the downstream pipe.
go False t2 (Free (Await k)) p2@(Free (Await _)) = tryAwait >>= \x -> go (isNothing x) t2 (k x) p2 go True False p1@(Free (Await _)) (Free (Await k)) = go True True p1 (k Nothing) go True True p1@(Free (Await _)) p2@(Free (Await _)) = tryAwait >>= \_ -> {- unreachable -} go True True p1 p2
This composition can be shown to be associative (in a rather strong sense), with identity given by:
idP :: Monad m => Pipe a a m r idP = forever $ await >>= yield
So we can define a Category
instance:
newtype PipeC m r a b = PipeC { unPipeC :: Pipe a b m r } instance Monad m => Category (PipeC m r) where id = PipeC idP (PipeC p2) . (PipeC p1) = PipeC $ p1 >+> p2
Running pipes
A runnable pipe, also called Pipeline
, is a pipe that doesn’t yield any value and doesn’t wait for any input. We can formalize this in the types as follows:
type Pipeline m r = Pipe () Void m r
Disregarding bottom, calling await
on such a pipe does not return any useful value, and yielding is impossible. Another way to think of Pipeline
is as an arrow (in PipeC
) from the terminal object to the initial object of Hask1.
Running a pipeline is straightforward:
runPipe :: Monad m => Pipeline m r -> m r runPipe (Pure r) = return r runPipe (Free (M m)) = m >>= runPipe runPipe (Free (Await k)) = runPipe $ k (Just ()) runPipe (Free (Yield x c)) = absurd x
where the impossibility of the last case is guaranteed by the types, unless of course the pipe introduced a bottom value at some point.
The three primitive operations tryAwait
, yield
and lift
, together with pipe composition and the runPipe
function above, are basically all we need to define most pipes and pipe combinators. For example, the simple pipe interpreter evalPipe
can be easily rewritten in terms of these primitives:
evalPipe' :: Monad m => Pipe a b m r -> [a] -> m [b] evalPipe' p xs = runPipe $ (mapM_ yield xs >> return []) >+> (p >> discard) >+> collect id where collect xs = tryAwait >>= maybe (return $ xs []) (\x -> collect (xs . (x:)))
Note that we use the discard
pipe to turn the original pipe into an infinite one, so that the final return value will be taken from the final pipe.
Extra combinators
The rich structure on pipes (category and monad) makes it really easy to define new higher-level combinators. For example, here are implementations of some of the combinators in Data.Conduit.List, translated to pipes:
sourceList = mapM_ yield sourceNull = return () fold f z = go z where go x = tryAwait >>= maybe (return x) (go . f x) consume = fold (\xs x -> xs . (x:)) id >>= \xs -> return (xs []) sinkNull = discard take n = (isolate n >> return []) >+> consume drop n = replicateM n await >> idP pipe f = forever $ await >>= yield . f -- called map in conduit concatMap f = forever $ await >>= mapM_ yield . f until p = go where go = await >>= \x -> if p x then return () else yield x >> go groupBy (~=) = p >+> forever (until isNothing >+> pipe fromJust >+> (consume >>= yield)) where -- the pipe p yields Nothing whenever the current item y -- and the previous one x do not satisfy x ~= y, and behaves -- like idP otherwise p = await >>= \x -> yield (Just x) >> go x go x = do y <- await unless (x ~= y) $ yield Nothing yield $ Just y go y isolate n = replicateM_ n $ await >>= yield filter p = forever $ until (not . p)
To work with the equivalent of sinks, it is useful to define a source to sink composition operator:
infixr 2 $$ ($$) :: Monad m => Pipe () a m r' -> Pipe a Void m r -> m (Maybe r) p1 $$ p2 = runPipe $ (p1 >> return Nothing) >+> liftM Just p2
which ignores the source return type, and just returns the sink return value, or Nothing
if the source happens to terminate first. So we have, for example:
ex2 :: Maybe [Int] ex2 = runIdentity $ sourceList [1..10] >+> isolate 4 $$ consume {- ex2 == Just [1,2,3,4] -} ex3 :: Maybe [Int] ex3 = runIdentity $ sourceList [1..10] $$ discard {- ex3 == Nothing -} ex4 :: Maybe Int ex4 = runIdentity $ sourceList [1,1,2,2,2,3,4,4] >+> groupBy (==) >+> pipe head $$ fold (+) 0 {- ex4 == Just 10 -} ex5 :: Maybe [Int] ex5 = runIdentity $ sourceList [1..10] >+> filter (\x -> x `mod` 3 == 0) $$ consume {- ex5 == Just [3, 6, 9] -}
Pipes in practice
You can find an implementation of guarded pipes in my fork of pipes. There is also a pipes-extra repository where you can find some pipes to deal with chunked ByteStream
s and utilities to convert conduits to pipes.
I hope to be able to merge this into the original pipes package once the guarded pipe concept has proven its worth. Without the tryAwait
primitive, combinators like fold
and consume
cannot be implemented, nor even a simple stateful pipe like one to split a chunked input into lines. So I think there are enough benefits to justify a little extra complexity in the definition of composition.
In reality, Hask doesn’t have an initial object, and the terminal object is actually
Void
, because of non-strict semantics.↩
Comments