David Mertz, Ph.D.
Selector, Gnosis Software, Inc.
May, 2003
Twisted Matrix is an increasingly popular pure-Python framework for programming network services and applications. While there are a large number of loosely-coupled modular components within Twisted Matrix, a central concept to the framework is the idea of non-blocking asynchronous servers. This style of programming is novel for developers accustomed to threading or forking servers, but also allows for great efficiency under heavy loads.
Sorting through Twisted Matrix is reminiscent of the old story about blind men and elephants. Twisted Matrix has many capabilities within it, and it takes a bit of a gestalt switch to get a good sense of why they are all there. In fact, with this first installment, I am probably only halfway toward getting my mind fully around Twisted Matrix. Readers and I can work through it together.
One of the strengths of recent versions of Python is that they come with "batteries included"--that is, the standard distribution includes modules to do just about everything you want to accomplish in most programming tasks. For the most part, when you want a 3rd party Python module or package, it is to accomplish some specialized and unusual task. Twisted Matrix is one of few exceptions to the pattern described--Twisted is a well-designed and general-purpose collection of modules for performing all manner of network programming, in ways not easily facilitated by Python's standard library.
It is not quite true that Python's standard library lacks
support for asynchronous, non-blocking network applications.
The module asyncore
provides basic support for switching
among I/O channels within a single thread. But Twisted Matrix
takes the style to a much higher level, and provides a huge
collection of pre-built and reusable protocols, interfaces and
components.
The documentation that accompanies Twisted Matrix is quite
extensive, but hard to get a handle on. Let us start with a
simple server, and build on that by several steps. In a recent
developerWorks tip, I demonstrated an XML-based "weblog server"
that presents to a client a stream of records about the latest
hits to a web server. The XML aspect is not important here,
but the use of SocketServer
and its ThreadingTCPServer class
is useful as a baseline. This pre-Twisted server consists of:
from SocketServer import BaseRequestHandler, ThreadingTCPServer from time import sleep import sys, socket from webloglib import log_fields, hit_tag class WebLogHandler(BaseRequestHandler): def handle(self): print "Connected from", self.client_address self.request.sendall('<hits>') try: while True: for hit in LOG.readlines(): self.request.sendall(hit_tag % log_fields(hit)) sleep(5) except socket.error: self.request.close() print "Disconnected from", self.client_address if __name__=='__main__': global LOG LOG = open('access-log') LOG.seek(0, 2) # Start at end of current access log srv = ThreadingTCPServer(('',8888), WebLogHandler) srv.serve_forever()
Other than that overhead of its per-client thread creation, a
notable feature of the SocketServer
based sever is its use of
a blocking call to time.sleep()
within its handler. For
Twisted Matrix' non-blocking select()
loop, such a block is
impermissible.
A first non-blocking approach pushes any artificial delays onto the client, and lets the client specifically request each new batch of weblog records (and also sends a message to indicate their absence, rather than send nothing). This Twisted server looks like:
from twisted.internet import reactor from twisted.internet.protocol import Protocol, Factory from webloglib import hit_tag, log_fields class WebLog(Protocol): def connectionMade(self): print "Connected from", self.transport.client self.transport.write('<hits>') def dataReceived(self, data): newhits = LOG.readlines() if not newhits: self.transport.write('<none/>') for hit in newhits: self.transport.write(hit_tag % log_fields(hit)) def connectionLost(self, reason): print "Disconnected from", self.transport.client factory = Factory() factory.protocol = WebLog if __name__=='__main__': global LOG LOG = open('access-log') LOG.seek(0, 2) # Start at end of current access log reactor.listenTCP(8888, factory) reactor.run()
Readers should refer to my prior tip for details on the client application. But the following change should be noted. The main client loop adds two lines:
while 1: xml_data = sock.recv(8192) parser.feed(xml_data) sleep(5) # Delay before requesting new records sock.send('NEW?') # Send signal to indicate readiness
A Twisted Matrix server consists of several modular elements.
At a bytestream level, a server implements a protocol, often by
inheriting from twisted.internet.protocol.Protocol
or from
some previously specialized child of it. For example, provided
subclasses (in twisted.protocols
) include dns
, ftp
,
gnutella
, http
, nntp
, shoutcast
, and many others.
Basically, a protocol should know how to handle making and
losing connections, and receiving and sending data within a
connection. These responsibilities are not much different than
in a SocketServer
based server, except in being slightly
more modular in defining methods for each element.
The next level of a Twisted Matrix server is a factory. In our
twisted-weblog-1.py
example, the factory really does nothing
besides store a protocol. In a more sophisticated server,
however, a factory is a good place to perform initialization
and finalization related to a protocol sever. And probably of
greatest interest, a factory can be persisted within
applications (we will see those soon).
Neither a protocol or a factory knows anything about the
network the server runs on. Instead, a reactor is a class
that actually listens on a network (utilizing a factory
instance for its protocol). Basically, a reactor is just a
loop that listens on a given port and network interface (which
one is chosen by calling a method like .listenTCP()
,
.listenSSL()
or .listenUDP()
). The thing to understand is
that the basic reactor in Twisted Matrix, SelectReactor
, runs
in a single thread--each connection is checked for new data,
and the data is delivered to the relevant protocol object. An
upshot is that a protocol object is really not allowed to
block, or even just take too long to complete (protocols must
be programmed appropriately).
Let us try to enhance the Twisted weblog server so that it
follows the pattern of SocketServer-weblog.py
in feeding new
records to clients without the need for repeated requests from
those clients. The problem here is inserting a time.sleep()
call into a method of WebLog(Protocol)
causes it to block,
and so is not allowed. While we are at it, notice that the
prior servers probably do the wrong thing in that they feed
each new batch of records only to one client. Presumably, if
you want to allow multiple clients to monitor a weblog, you
want them all to receive ongoing updates.
The way you delay actions in Twisted Matrix without blocking is
to add callbacks to a reactor, using the .callLater()
method.
A callback added this way is added to the queue of events to
service, but it will not actually be processed until after a
specified delay. Putting both changes together, an enhanced
weblog server looks like:
from twisted.internet import reactor from twisted.internet.protocol import Protocol, Factory from webloglib import hit_tag, log_fields import time class WebLog(Protocol): def connectionMade(self): print "Connected from", self.transport.client self.transport.write('<hits>') self.ts = time.time() self.newHits() def newHits(self): for hit in self.factory.records: if self.ts <= hit[0]: self.transport.write(hit_tag % log_fields(hit[1])) self.ts = time.time() reactor.callLater(5, self.newHits) def connectionLost(self, reason): print "Disconnected from", self.transport.client class WebLogFactory(Factory): protocol = WebLog def __init__(self, fname): self.fname = fname self.records = [] def startFactory(self): self.fp = open(self.fname) self.fp.seek(0, 2) # Start at end of current access log self.updateRecords() def updateRecords(self): ts = time.time() for rec in self.fp.readlines(): self.records.append((ts, rec)) self.records = self.records[-100:] # Only keep last 100 hits reactor.callLater(1, self.updateRecords) def stopFactory(self): self.fp.close() if __name__=='__main__': reactor.listenTCP(8888, WebLogFactory('access-log')) reactor.run()
In this case, we define a custom factory and move some of the
initialization from the _main_
block to the factory. Notice
also that the clients for this server need not (and should not)
sleep or send new requests--in fact, I use the exact client
application I discussed in my mentioned XML Zone tip.
The factory and the protocol use the same technique in their
custom methods .updatedRecords()
and .newHits()
,
respectively. That is if a method wants to run periodically, its
last line can schedule it to run again at a specified delay. On
its face, this pattern looks a lot like recursion--but it is not,
in fact (moreover, the repeat scheduling need not occur on the
last line; it just makes sense there). The method .newHits()
,
for example, simply lets the controlling reactor loop know that
it wants to be called in another 5 seconds, but the method itself
terminates. There is no requirement that a method schedule only
itself--it can schedule whatever it wants to occur, and
functions quite apart from factory or protocol methods can be
added to a reactor loop, if you wish.
As well as reactor.callLater()
scheduling, Twisted Matrix
contains a general class twisted.internet.defer.Deferred
. In
essense, deferreds are a generalization of scheduled callbacks,
but allow techniques such as chaining dependent callbacks and
handling error conditions in these chains. The idea behind a
Deferred
object is that when you call a method, rather than
wait for its results (which may take a while to arrive), the
method can immediately return a Deferred
object which the
reactor/scheduler can call again later, when results are expected
to be available.
I have not really played with Deferred
objects yet, but it
feels like getting them right will be slightly tricky. If you
need to wait on a blocking action-say the results from a
remote database query--it is not clear exactly how long you
will need to wait for results to be available. Deferred
objects do have a timeout mechanism, but I will have to come
back to that in a later installment. Interested readers should
at least know that the Twisted Matrix developers have attempted
to provide a standard API for wrapping blocking actions. Of
course, the worst case is to fall back to using threads for
blocking actions that really cannot be converted into
asynchronous callbacks.
Another important element to Twisted Matrix servers is their easy
support for persistence. A reactor is a loop that monitors and
responds to I/O events. An application is much like an enhanced
reactor that is able to pickle its state for later re-starting.
Moreover, applications can be statefully saved into ".tap" files,
and can be managed and daemonized using the tool twistd
. Let me
present a simple example that illustrates the usage (modelled on
the Twisted documentation's OneTimeKey
example). This server
delivers distinct Fibonacci numbers to all interested clients,
without repeating numbers between them--even if the server is
stopped and started:
from twisted.internet.app import Application from twisted.internet.protocol import Protocol, Factory class Fibonacci(Protocol): "Serve a sequence of Fibonacci numbers to all requesters" def dataReceived(self, data): self.factory.new = self.factory.a + self.factory.b self.transport.write('%d' % self.factory.new) self.factory.a = self.factory.b self.factory.b = self.factory.new def main(): import fib_server # Use script as namespace f = Factory() f.protocol = fib_server.Fibonacci f.a, f.b = 1, 1 application = Application("Fibonacci") application.listenTCP(8888, f) application.save() if '__main__' == __name__: main()
You can see that mostly all we have changed is replacing
reactor
with application
throughout. While the class
Application
also has a .run()
method, we use its .save()
method to create a Fibonacci.tap
file. Running this server is
done as:
% python fib_server.py % twistd -f Fibonacci.tap ...let server run, then shut it down... % kill `cat twistd.pid` ...re-start server where it left off... % twistd -f Fibonacci-shutdown.tap ...serve numbers where we left off...
The client that connects to this server should use a
time.sleep()
in its loop if it only wants a new number
intermittently rather than as fast as possible. Obviously, a more
useful server can provide a more intersting stateful datastream.
This article looked at fairly low-level details of Twisted Matrix--defining custom protocols, and the like. But Twisted Matrix exists at many levels--including high-level templating for web services and other common protocls. In the next installments of this series, we will start to look at web services specifically, and pick up some miscellaneous threads that were left dangling.
Twisted Matrix comes with quite a bit of documentation, and many examples. Browse around its homepage to glean a greater sense of how Twisted Matrix works, and what has been implemented with it (or wait for the next installments here):
http://twistedmatrix.com
A simple version of a weblog server was presented in the developerWorks tip, Use Simple API for XML as a long-running event processor:
http://www-106.ibm.com/developerworks/xml/library/x-tipasysax.html
Remi Delon has created a web host service called that specializes in providing Python tools and libraries (including Twisted). For a Python web programmer, having a host with the most up-to-date versions of libraries like Twisted, Zope, Webware, SkunkWeb, CherryPy, and others, is quite useful. And in regards to this article, Remi has given me a complimentary account to use for testing scripts like those in this articles:
http://python-hosting.com
David Mertz believes that it is turtles all the way down. David may be reached at [email protected]; his life pored over at http://gnosis.cx/publish/. And buy his book: http://gnosis.cx/TPiP/.