Sunday, December 30, 2007

Pipelines Using Fibers in Ruby 1.9

Users of the command line are familiar with the idea of building pipelines: a chain of simple commands strung together to the output of one becomes the input of the next. Using pipelines and a basic set of primitives, shell users can accomplish some sophisticated tasks. Here’s a basic Unix shell pipeline that reports the ten longest .tip files in the current directory, based on the number of lines in each file:


 wc -l *.tip | grep \.tip | sort -n | tail -10

Let’s see how to add something similar to Ruby. By the end of this set of two articles, we’ll be able to write things like


puts (even_numbers | tripler | incrementer | multiple_of_five ).resume

and a palindrome finder using blocks:


words            = Pump.new %w{Madam, the civic radar rotator is not level.}
is_palindrome = Filter.new {|word| word == word.reverse}

pipeline = words .| {|word| word.downcase.tr("^a-z", '') } .| is_palindrome

while word = pipeline.resume
puts word
end

Great code? Nope. But getting there is fun. And, who knows? The techniques might well be useful in your next project.


A Daily Dose of Fiber


Ruby 1.9 adds support for Fibers. At their most basic, let you create simple generators (much as you could do previously with blocks. Here’s a trivial example: a fiber that generates successive Fibonacci numbers:


      fib = Fiber.new do
f1 = f2 = 1
loop do
Fiber.yield f1
f1, f2 = f2, f1 + f2
end
end

10.times { puts fib.resume }

A fiber is somewhat like a thread, except you have control over when it gets scheduled. Initially, a fiber is suspended. When you resume it, it runs the block until the block finishes, or it hits a Fiber.yield. This is similar to a regular block yield: it suspends the fiber and passes control back to the resume. Any value passed to Fiber.yield becomes the value returned by resume.


By default, a fiber can only yield back to the code that resumed it. However, if you require the "fiber"library, Fibers get extended with a transfer method that allows one fiber to transfer control to another. Fibers then become fully fledged coroutines. However, we won’t be needing all that power today.


Instead, let’s get back to the idea of creating pipelines of functionality in code, much as you can create pipelines in the shell.


As a starting point, let’s write two fibers. One’s a generator—it creates a list of even numbers. The second is a consumer. All it does it accept values from the generator and print them. We’ll make the consumer stop after printing 10 numbers.


    evens = Fiber.new do
value = 0
loop do
Fiber.yield value
value += 2
end
end

consumer = Fiber.new do
10.times do
next_value = evens.resume
puts next_value
end
end

consumer.resume

Note how we had to use resume to kick off the consumer. Technically, the consumer doesn’t have to be a Fiber, but, as we’ll see in a minute, making it one gives us some flexibility.


As a next step, notice how we’ve created some coupling in this code. Our consumer fiber has the name of the evens generator coded into it. Let’s wrap both fibers in a method, and pass the name of the generator into the consumer method.


    def evens
Fiber.new do
value = 0
loop do
Fiber.yield value
value += 2
end
end
end

def consumer(source)
Fiber.new do
10.times do
next_value = source.resume
puts next_value
end
end
end

consumer(evens).resume

OK. Let’s add one more fiber to the weave. We’ll create a filter that only passes on numbers that are multiples of three. Again, we’ll wrap it in a method.


    def evens
Fiber.new do
value = 0
loop do
Fiber.yield value
value += 2
end
end
end

def multiples_of_three(source)
Fiber.new do
loop do
next_value = source.resume
Fiber.yield next_value if next_value % 3 == 0
end
end
end

def consumer(source)
Fiber.new do
10.times do
next_value = source.resume
puts next_value
end
end
end

consumer(multiples_of_three(evens)).resume

Running this, we get the output


0
6
12
18
. . .

This is getting cool. We write little chunks of code, and then combine them to get work done. Just like a pipeline. Except…


We can do better. First, the composition looks backwards. Because we’re passing methods to methods, we write


    consumer(multiples_of_three(evens))

Instead, we’d like to write


    evens | multiples_of_three | consumer

Also, there’s a fair amount of duplication in this code. Each of our little pipeline methods has the same overall structure, and each is coupled to the implementation of fibers. Let’s see if we can fix this.


Wrapping Fibers


As is usual when we’re refactoring towards a solution, we’re about to get really messy. Don’t worry, though. It will all wash off, and we’ll end up with something a lot neater.


First, let’s create a class that represents something that can appear in our pipeline. At it’s heart is theprocess method. This reads something from the input side of the pipe, then “handles” that value. The default handling is to write that value to the output side of the pipeline, passing it on to the next element in the chain.


    class PipelineElement

attr_accessor :source

def initialize
@fiber_delegate = Fiber.new do
process
end
end

def resume
@fiber_delegate.resume
end

def process
while value = input
handle_value(value)
end
end

def handle_value(value)
output(value)
end

def input
source.resume
end

def output(value)
Fiber.yield(value)
end
end

When I first wrote this, I was tempted to make PipelineElement a subclass of Fiber, but that leads to coupling. In the end, the pipeline elements delegate to a separate Fiber object.


The first element of the pipeline doesn’t receive any input from prior elements (because there are no prior elements), so we need to override its process method.


    class Evens < PipelineElement
def process
value = 0
loop do
output(value)
value += 2
end
end
end

evens = Evens.new

Just to make things more interesting, we’ll create a generic MultiplesOf filter, so we can filter based on any number, and not just 3:


    class MultiplesOf < PipelineElement
def initialize(factor)
@factor = factor
super()
end
def handle_value(value)
output(value) if value % @factor == 0
end
end

multiples_of_three = MultiplesOf.new(3)
multiples_of_seven = MultiplesOf.new(7)

Then we just knit it all together into a pipeline:


    multiples_of_three.source = evens
multiples_of_seven.source = multiples_of_three

10.times do
puts multiples_of_seven.resume
end

We get 0, 42, 84, 126, 168, and so on as output. (Any output stream that contains 42 must be correct, so no need for any unit tests here.)


But we’re still a little way from our ideal of being able to pipe these puppies together. It’s a good thing that Ruby let’s us override the “|” operator. Up in classPipelineElement, define a new method:


    def |(other)
other.source = self
other
end

This allows us to write:


    10.times do
puts (evens | multiples_of_three | multiples_of_seven).resume
end

or even:


    pipeline = evens | multiples_of_three | multiples_of_seven

10.times do
puts pipeline.resume
end

Cool, or what?


In The Next Thrilling Installment


The next post will take these basic ideas and tart them up a bit, allowing us to use blocks directly in pipelines. We’ll also reveal why our PipelineElement class I just wrote is somewhat more complicated than might seem necessary. In the meantime, here’s the full source of the code so far.


    class PipelineElement

attr_accessor :source

def initialize
@fiber_delegate = Fiber.new do
process
end
end

def |(other)
other.source = self
other
end

def resume
@fiber_delegate.resume
end

def process
while value = input
handle_value(value)
end
end

def handle_value(value)
output(value)
end

def input
source.resume
end

def output(value)
Fiber.yield(value)
end
end

##
# The classes below are the elements in our pipeline
#
class Evens < PipelineElement
def process
value = 0
loop do
output(value)
value += 2
end
end
end

class MultiplesOf < PipelineElement
def initialize(factor)
@factor = factor
super()
end
def handle_value(value)
output(value) if value % @factor == 0
end
end

evens = Evens.new
multiples_of_three = MultiplesOf.new(3)
multiples_of_seven = MultiplesOf.new(7)

pipeline = evens | multiples_of_three | multiples_of_seven

10.times do
puts pipeline.resume
end

No comments:

Post a Comment