# * clockwork: simple asynchronous programming
# ** background
"""
This is basically microthreads:
- define things to do as generators
- yield None to give up control
- or yield another generator!!
This last one means that your generators
can spawn sub-generators.
@TODO: allow launching threads without waiting
Usage:
w = Worker()
w.assign(yourGenerator)
# and then...
if in_loop:
w.tick()
else:
w.work()
You want stuff to happen almost at the same time
but okay to just switch back and forth really fast
if you take too long at one, then the others lock up
so try not to do that!
"""
# ** applications
# *** use case: vengeance
# *** use case: kiwi game console
# *** use case: linkwatcher
# *** use case: web server
"""
lighttpd uses this approach
"""
# * dependencies
import logging
import sched
import subprocess as _subprocess
import time
import types
import unittest
# * -- implementation --
# * goal: a simple queue of tasks
"""
Our top-level object are called Workers. It's
possible to have as many workers hanging around as you like, but you
probably only need one per program.
What a Worker does is very simple. It has a queue of
work to do, and it works through the queue as long as there's
work left to do. Here's how it looks:
"""
class WorkerTest(unittest.TestCase):
def test(self):
# Start with a Worker.
w = Worker()
# Create some work.
# For example, take this list...
x = []
# And define some work:
def doSomething():
x.append(123)
yield None # has to be a generator!
w.assign(doSomething())
# Tell the worker to start working.
w.work()
# The work will get done.
self.assertEquals([123], x)
"""
Now to make it happen.
"""
# * GeneratorStack
class GeneratorStack(object):
def __init__(self, task):
self.stack = []
self.task = task
self._done = False
def push(self, task):
self.stack.append(self.task)
self.task = task
def pop(self):
if self.stack:
self.task = self.stack.pop()
else:
self._done = True
def tick(self):
try:
next = self.task.next()
except StopIteration:
self.pop()
else:
if next is None:
pass # yield None
else:
# we can also 'yield genFunc()'
assert isinstance(next, types.GeneratorType), \
"clockwork expects you to yield None or yield a generator function!!"
self.push(next)
def isDone(self):
return self._done
# * Worker
class Worker(object):
def __init__(self):
# queue is a list of call stacks
self.queue = []
self.gen = None
def assign(self, task):
assert isinstance(task, types.GeneratorType), \
"assign expects a generator!"
self.queue.append(GeneratorStack(task))
def work(self):
while self.queue:
self.doNextTask()
def tick(self):
if self.queue:
self.doNextTask()
def doNextTask(self):
assert self.queue, "nothing in queue"
task = self.queue.pop(0)
task.tick()
if not task.isDone():
self.queue.append(task)
"""
We will extend this class as we go along.
"""
# * goal: microtheading
"""
Sometimes there's too much work to do at once so
we break the work into small pieces. It would be
great if we could use generators to 'pause' execution
of our task:
"""
class GeneratorTest(unittest.TestCase):
def test(self):
# start with an empty list
data = []
# and create a generator to append data.
# generators can be finite:
def letters():
data.append("a")
yield None
data.append("z")
yield None
# or infinite:
def numbers():
i = 0
while True:
i += 1
data.append(i)
yield None
# assign the generators to the worker
w = Worker()
w.assign(letters())
w.assign(numbers())
# now run.
w.doNextTask()
self.assertEquals(["a"], data)
w.doNextTask()
self.assertEquals(["a", 1], data)
w.doNextTask()
self.assertEquals(["a", 1, "z"], data)
w.doNextTask()
self.assertEquals(["a", 1, "z", 2], data)
# the next task is to dispose of "letters"
# so nothing should change in the data:
w.doNextTask()
self.assertEquals(["a", 1, "z", 2], data) # same!
w.doNextTask()
self.assertEquals(["a", 1, "z", 2, 3], data)
w.doNextTask()
self.assertEquals(["a", 1, "z", 2, 3, 4], data)
# * implementing generator support
"""
The basic idea here is that generators stay in the queue until they're
done. That means an infinite generator stays int he queue forever, or
at least until someone stops the program.
We still want simple tasks to be removed from the queue immediately
after they are executed, but now we want generators to stick around.
We'll have to modify Worker to make that happen.
Note that we're defining Worker as a subclass of the earlier
version of Worker. This is a trick to let us build the class
in stages. It is perfectly legal python syntax, and we will be using
it repeatedly.
"""
# * sleep
"""
sleep waits for (at least) a certain number
of seconds before resuming.
"""
def sleep(secs):
last = time.time()
while True:
now = time.time()
if (now - last) <= secs:
yield None
else:
break
# * repeat
"""
repeat is a generator for recurring tasks.
Runs the function immediately, then waits
before looping.
"""
def repeat(secs, func):
while True:
yield func()
yield sleep(secs)
# * spawn
"""
spawn a child process in the operating system.
"""
class CommandTest(unittest.TestCase):
def test(self):
w = Worker()
echo = Command(["/bin/echo", "cat"])
w.assign(echo())
w.work()
self.assertEquals("cat\n", echo.stdout.read())
self.assertEquals("", echo.stderr.read())
class Command(object):
def __init__(self, cmd):
self.cmd = cmd
def __call__(self):
proc = _subprocess.Popen(self.cmd,
stderr=_subprocess.PIPE,
stdout=_subprocess.PIPE)
while proc.poll() is None:
yield None
self.stdout = proc.stdout
self.stderr = proc.stderr
# * wrap() for non-clockwork callers
"""
Sometimes we want to call clockwork routines from normal python code.
Because of the odd conventions, a clockwork routine only makes sense
inside clockwork, so it can only be called by clockwork-aware
code. But you don't want to cut your features off from all the
non-clockwork code out there, so there has to be a simple way to take
a clockwork routine and wrap it so that it can be used by the outside
world. Well, that's what wrap does.
"""
class WrapTest(unittest.TestCase):
def test(self):
# make a clockwork function:
def routine(data):
for x in range(4):
data.append(x*x)
yield None
# without the wrapper, you get the generator:
data1 = []
assert isinstance(routine(data1), types.GeneratorType)
assert data1 == []
# but the wrapper actually runs it for you:
data2 = []
assert wrap(routine(data2)) is None
self.assertEquals([0,1,4,9], data2)
"""
The implementation is just this:
"""
def wrap(gen):
w = Worker()
w.assign(gen)
w.work()
# * return values
"""
As of python2.4, there's no easy way to pass a
value back into a running generator. We could
probably cook up some evil trickery to pass
values around on the generator stack, but since
this problem is likely to go away in python2.5,
we'll just use the simple trick of passing in
a result variable.
Later, once it's implemented, we'll use use
the new expression form of 'yield':
http://www.python.org/peps/pep-0342.html
"""
class ResultTest(unittest.TestCase):
def test(self):
ret = Result()
ret(5)
assert ~ret == 5
ret((1,2,3))
assert ~ret == (1,2,3)
class Result(object):
def __init__(self):
self.value = None
def __call__(self, value):
self.value = value
def __invert__(self):
return self.value
# * @TODO: async sockets
"""
asynchronous sockets are not implemented yet.
when i need to do this, i just leave the http
request to a child process.
"""
# * self tests
if __name__=="__main__":
unittest.main()
# * appendix: competitors
"""
twisted (yuck)
http://lgt.berlios.de/#nanothreads
http://www.python.org/peps/pep-0342.html
"""