Synchronising eventlets and threads
Eventlet is an asynchronous network I/O framework which combines an event loop with greenlet-based coroutines to provide a familiar blocking-like API to the developer. One of the reasons I like eventlet a lot is that the technology it builds on allows it's event loop to run inside a thread or even run multiple event loops in different threads. This makes it a lot more amenable to slowly evolving existing applications to be more asynchronous then solutions like e.g. gevent which only allow one event loop per process.
Eventlet isn't the most mature of tools however and it's API shows signs of being developed as needs arose. Not that this is necessarily a bad thing, APIs do need to grow from being used, but don't be surprised if you need to dig down into some parts and discover rough edges (hi IPv6!). The API does have a decent collection of tools you'll be familiar with however: greenlet-local storage, semaphores, events (though not quite the event you're used too) and even some extra goodies like pools, WSGI servers, DBAPI2 connection pools and ZeroMQ support. But you'll notice that all these goodies are designed to work in a greenlet-only world. And the one place where threads are acknowledged, a global threadpool of workers as a last resort to make things behave asynchronous, looks very messy and entirely not reusable (it's full of module globals for one). So if you're introducing eventlet into an existing application and you need to communicate data and events between threads and eventlets you'll find a void.
So after some studying of
the
tpool module I decided to build a class which could synchronise
between threads and eventlets. This class, which I called
a Notifier, can be basically thought of as
a Condition
without the lock, i.e. there are three
methods: .wait()
, .notify()
and the rather
similar .notify_all()
. The idea is that any thread or
eventlet which calls .wait()
will block (cooperatively
block in the case of a greenlet) until it gets woken up by a call to
one of the notifying methods. That's all there is to it.
Building a Notifier
(Be prepared to look at the source code
for eventlet.hubs.hub
, threading
and
related code when reading this.)
Firstly the class will need to be constructed. For now there's
only one interesting instance attribute and that
is _waiters
which is a set which will contain all the
threads and eventlets currently blocking on a call
to .wait()
.
Don't worry yet about what goes into the set of waiters and also
ignore the hubcache for now. We'll get to those later.
def __init__(self, hubcache=GLOBAL_HUBCACHE):
self._waiters = set()
self.hubcache = hubcache
Now lets build the .wait()
method: it needs to block
until notified. But blocking is significantly different when you're
running in a thread then when running in an eventlet. The basics
are that in a thread you want to really block using the locking
primitives provided by the OS (exposed to Python in
the thread
module) while an eventlet essentially wants
to switch to the event loop, called the hub, in the hope
someone will eventually switch back to it when it needs to wake up.
These two are so different that they easily divide in two
methods: .gwait()
for blocking eventlets
and .twait()
for blocking threads.
You could call these directly obviously, but as you can see
abstracting them away is not that hard. You can easily detect if
you're in an eventlet by checking if the hub (which is thread-local)
is actually running. The price you pay for this is that this will
create a hub instance in each thread, even if it is not used. But
the worst this does is waste some memory. (This could be avoided by
checking for the
def wait(self, timeout=None):
hub = eventlet.hubs.get_hub()
if hub.running:
self.gwait(timeout)
else:
self.twait(timeout)
eventlet.hubs._threadlocal.hub
attribute, but that's even more internal
then .get_hub()
.)
As already mentioned the basics of blocking in an eventlet is to
switch to the hub and then wait until some other
greenlet running in the same thread switches back to you.
So a notifier needs to have a reference to your geenlet instance so
it can call .switch()
on it when the time comes to wake
you up. But what does another thread do? Well it turns out another
thread could ask your hub to schedule a function to run in your
thread using the hub's .schedule_call_global()
method
since the only thread-critical operation is an append on the
hubs's next_timers
list, which is a thread-safe
operation. Now there is another catch, remember that the hub is
basically an event loop? Well if no events happen then it will not
be going round it loop! And appening something to a list is not
creating an event. So what you need to do is
use os.pipe
so you have a filedescriptor which you can register with the hub and
now you can just write some data into this pipe when you want the
hub of the waiter to wake up. Setting up this pipe is what the
mysterious call to ._create_pipe()
does, we'll see it
in detail later. The rest is just simple sugar: dealing with
timeouts and returning the correct values for them:
def gwait(self, timeout=None):
waiter = eventlet.getcurrent()
hub = eventlet.hubs.get_hub()
self._create_pipe(hub)
self._waiters.add((waiter, hub))
if timeout and timeout > 0:
timeout = eventlet.Timeout(timeout)
try:
with timeout:
hub.switch()
except eventlet.Timeout, t:
if t is not timeout:
raise
self._waiters.discard((waiter, hub))
return False
else:
return True
else:
hub.switch()
return True
Next on lets look at how you block in a thread. This is actually
surprisingly simple, just copy
what threading.Condition.wait()
does for it: it is
perfectly non-blocking for a greenlet to call .release()
on a lock which was acquired by another thread. Notice the ugly
CPU-consuming spinning which happens when a timeout is in use,
luckily this has been fixed in python 3.2
(issue7316).
The last thing of interest here is how the hub does not matter, so
we just place
def twait(self, timeout=None):
waiter = threading.Lock()
waiter.acquire()
self._waiters.add((waiter, None))
if timeout is None:
waiter.acquire()
return True
else:
# Spin around a little, just like the stdlib does
_time = time.time
_sleep = time.sleep
min = __builtin__.min
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
if not gotit:
self._waiters.discard((waiter, None))
return False
else:
return True
None
in the set of waiters.
Now lets have a look at what notify looks like. We've already
discussed what needs to happen here: to notify an eventlet we need
schedule a call with it's hub to switch to it (no point special
casing when we're already running in that same hub). In case we
notified it from a different thread we also need to signal the hub
using the pipe set up by .wait()
so it will actually
start to go round it's loop and execute this just scheduled call.
Notifying a thread is even easier: just unlock the lock it's trying
to acquire.
def notify(self):
if self._waiters:
waiter, hub = self._waiters.pop()
if hub is None:
# This is a waiting thread
try:
waiter.release()
except thread.error:
pass
else:
# This is a waiting greenlet
def notif(waiter):
waiter.switch()
hub.schedule_call_global(0, notif, waiter)
if hub is not eventlet.hubs.get_hub():
self._kick_hub(hub)
Admittedly I've hidden some of the cute trickery away in
those ._create_pipe()
and ._kick_hub()
calls, so lets leave the boring .notify_all()
and skip
straight to them.
The principle for this is that each eventlet which
calls .gwait()
needs to ensure there is a pipe
available to which a thread can write something. If the hub of the
eventlet was waiting for the reading end of this pipe to become
readable it will wake up and notice it has to run
the notif()
function which was scheduled by our call
to .notify()
. But creating a pipe for each call
to .gwait()
does seem rather wasteful and this is where
the mysterious hubcache comes into play: it is a dictionary keeping
track of the pipe associated with each hub.
You can see this asks the hub to wake up when the reading end of the
created pipe becomes readable and when this
happens
def _create_pipe(self, hub):
if hub in self.hubcache:
return
def read_callback(fd):
os.read(fd, 512)
rfd, wfd = os.pipe()
listener = hub.add(eventlet.hubs.hub.READ, rfd, read_callback)
self.hubcache[hub] = (rfd, wfd, listener)
read_callback()
will be called. The only
purpose of read_callback()
is to read all the data
written to the pipe so that the OS buffers are emptied and the pipe
can be re-used to wake the hub up the next time.
Now there is just ._kick_hub()
left. This should now
be obvious: look up the hub in the hubcache and write some data to
the writing end of the pipe. The only gotcha here is that while we
might be called from another thread, this does not mean the calling
thread itself can't be part of an eventlet mainloop. So in that
case make sure not to do a blocking write (as unlikely as that might
be).
Notice here how in the async case we remove the listener which was
used to wake the hub up right from the callback itself. If we
didn't do this then the hub would most likely find the writing end
of the pipe writable again on the next loop which would trigger
another notification of the other hub, not what we want!
def _kick_hub(self, hub):
rfd, wfd, r_listener = self.hubcache[hub]
current_hub = eventlet.hubs.get_hub()
if current_hub.running:
def write(fd):
os.write(fd, 'A')
current_hub.remove(w_listener)
w_listener = current_hub.add(eventlet.hubs.hub.WRITE, wfd, write)
else:
os.write(wfd, 'A')
That's cute, but what now?
We've now got a great way of notifying other threads and eventlets at will. But this is an entirely non-standard tool! Using this is strange, unfamiliar and unwieldy. This isn't one of the synchronisation primitives we know and wanted to use.
But look how easy it is to build a lock now: we just need a real
lock and one of these strange notifiers:
Most of this code is to deal with the timeout of the lock. If you
would only implement a lock as how it was before Python 3.2
this would have been even simpler. (Oh, also notice
the
class Lock(object):
def __init__(self, hubcache=GLOBAL_HUBCACHE):
self.hubcache = hubcache
self._notif = Notifier(hubcache)
self._lock = threading.Lock()
self.owner = None
def acquire(self, blocking=True, timeout=None):
gotit = self._lock.acquire(False)
if gotit or not blocking:
if gotit:
self.owner = eventlet.getcurrent()
return gotit
if timeout is None:
while not gotit:
self._notif.wait()
gotit = self._lock.acquire(False)
if gotit:
self.owner = eventlet.getcurrent()
return True
else:
if timeout < 0:
raise RuntimeError('timeout must be greater or equal then 0')
now = time.time()
end = now + timeout
while not gotit and (now < end):
self._notif.wait(end - now)
gotit = self._lock.acquire(False)
now = time.time()
if gotit:
self.owner = eventlet.getcurrent()
return gotit
__enter__ = acquire
def release(self):
self._lock.release()
self.owner = None
self._notif.notify()
def __exit__(self, exc_type, exc_value, traceback):
self.release()
def __repr__(self):
return ('<gsync.Lock object at 0x%x (%r, %r)>' %
(id(self), self.owner, self._notif))
owner
attribute, that will come in handy for the
next tool.)
Of course now we have a lock we can easily build the next tool: a
proper Condition
. It's almost like I planned this!
No surprises here. This is truly getting trivial to implement
thanks to our previous two primitives.
class Condition(object):
def __init__(self, lock=None, hubcache=GLOBAL_HUBCACHE):
if lock is None:
self._lock = Lock(hubcache=hubcache)
else:
self._lock = lock
self._notif = Notifier(hubcache=hubcache)
# Export the lock methods
self.acquire = self._lock.acquire
self.release = self._lock.release
self.__enter__ = self._lock.__enter__
self.__exit__ = self._lock.__exit__
def __repr__(self):
return '<gsync.Condition (%r, %r)>' % (self._lock, self._notif)
def wait(self, timeout=None):
if self._lock.owner is not eventlet.getcurrent():
raise RuntimeError('Can not wait on un-acquired lock')
self._lock.release()
try:
self._notif.wait(timeout)
finally:
self._lock.acquire()
def notify(self):
if self._lock.owner is not eventlet.getcurrent():
raise RuntimeError('Can not notify on un-acquired lock')
self._notif.notify()
def notify_all(self):
if self._lock.owner is not eventlet.getcurrent():
raise RuntimeError('Can not notify on un-acquired lock')
self._notif.notify_all()
Now once we have a condition we can finally get to the real prise:
a queue to move data freely between threads and eventlets.
Great, only had to provide a new
import Queue as queue
class BaseQueue(object):
def __init__(self, maxsize=0, hubcache=GLOBAL_HUBCACHE):
self.hubcache = hubcache
self.maxsize = maxsize
self._init(maxsize)
self.mutex = Lock(hubcache=hubcache)
self.not_empty = Condition(self.mutex, hubcache=hubcache)
self.not_full = Condition(self.mutex, hubcache=hubcache)
self.all_tasks_done = Condition(self.mutex, hubcache=hubcache)
self.unfinished_tasks = 0
class Queue(BaseQueue, queue.Queue):
pass
.__init__()
which uses
our own locking primitives, everything else of the
stdlib Queue
class can be re-used.
But why the strange diversion to create a
separate BaseQueue
class? Well, it makes making the
priority and lifo queues very easy:
That's
right, mro
FTW!
class PriorityQueue(BaseQueue, queue.PriorityQueue):
pass
class LifoQueue(BaseQueue, queue.LifoQueue): pass
Caveats when mixing threads and eventlets
There is one issue to watch out for: imagine a thread which
consumes items from a queue, spawning eventlets to do the work.
Notice that
class Worker(threading.Thread):
def __init__(self, inputq):
self.inputq = inputq
def run(self):
eventlet.sleep(0) # start the hub
while True:
item = self.inputq.get()
if item is PoisonPill:
break
eventlet.spawn(self.do_stuff, item)
eventlet.sleep(0)
line? It basically
switches to the hub, thereby implicitly starting it, which then
switches back immediately. But why?
Remember the code for Notifier.wait()
, it checked if
the hub was running to detect whether it was being called from
inside an eventlet or not. So if you didn't manage to start the hub
before calling this method the whole thread will block! Hence the
minor hack to start the hub manually beforehand.
All the code
Here all the code for the notifier in one piece, including the docstrings and comments. This also includes the global hubcache with a tiny bit of extra magic to be able to clear the cache if you so desire. Having this as a parameter to pass in allows you to use different hub caches if you have a reason to do so.
import os
import thread
import threading
import time
import eventlet
class HubCache(dict):
"""Cache used by Notifier instances
This is a dict-subclass to overwrite the .clear() method. It's
keys are hubs and values are (rfd, wfd, listener). Using this
means you can clear the cache in a way which will unregister the
listeners from the hubs and close all filedescriptors.
XXX This is hugely incomplete, only remove items from this cache
using the .clear() method as the other ways of removing items
will not release resources properly.
"""
def clear(self):
while self:
hub, (rfd, wfd, listener) = self.popitem()
hub.remove(listener)
os.close(rfd)
os.close(wfd)
def __del__(self):
self.clear()
"""The global hubcache
This is the default hubcache used by Notifier instances.
"""
GLOBAL_HUBCACHE = HubCache()
class Notifier(object):
"""Notify one or more waiters
This is essentially a condition without the lock. It can be used
to signal between threads and greenlets at will.
"""
# This doesn't use eventlet.hubs.trampoline since that results in
# a filedescriptor per waiting greenlet. Instead each eventlet
# that calls .gwait() will ensure there's a filedescriptor
# registered for reading for with it's hub. This filedescriptor
# is then only used when another thread wants to wake up the hub
# in order for a notification to be delivered to the eventlet.
def __init__(self, hubcache=GLOBAL_HUBCACHE):
"""Initialise the notifier
The hubcache is a dictionary which will keep pipes used by the
notifier so that only ever one pipe gets created per hub. The
default is to share this hubcache globally so all notifiers
use the same pipes for intra-hub communication.
"""
# Each item in this set is a tuple of (waiter, hub). For an
# eventlet the waiter is the greenlet while for a thread it is
# a lock. For a thread the hub item is always None.
self._waiters = set()
self.hubcache = hubcache
def wait(self, timeout=None):
"""Wait from a thread or eventlet
This blocks the current thread/eventlet until it gets woken up
by a call to .notify() or .notify_all().
This will automatically dispatch to .gwait() or .twait() as
needed so that the blocking will be cooperative for greenlets.
Returns True if this thread/eventlet was notified and False
when a timeout occurred.
"""
hub = eventlet.hubs.get_hub()
if hub.running:
self.gwait(timeout)
else:
self.twait(timeout)
def gwait(self, timeout=None):
"""Wait from an eventlet
This cooperatively blocks the current eventlet by switching to
the hub. The hub will switch back to this eventlet when it
gets notified.
Usually you can just call .wait() which will dispatch to this
method if you are in an eventlet.
Returns True if this thread/eventlet was notified and False
when a timeout occurred.
"""
waiter = eventlet.getcurrent()
hub = eventlet.hubs.get_hub()
self._create_pipe(hub)
self._waiters.add((waiter, hub))
if timeout and timeout > 0:
timeout = eventlet.Timeout(timeout)
try:
with timeout:
hub.switch()
except eventlet.Timeout, t:
if t is not timeout:
raise
self._waiters.discard((waiter, hub))
return False
else:
return True
else:
hub.switch()
return True
def twait(self, timeout=None):
"""Wait from an thread
This blocks the current thread by using a conventional lock.
Usually you can just call .wait() which will dispatch to this
method if you are in an eventlet.
Returns True if this thread/eventlet was notified and False
when a timeout occurred.
"""
waiter = threading.Lock()
waiter.acquire()
self._waiters.add((waiter, None))
if timeout is None:
waiter.acquire()
return True
else:
# Spin around a little, just like the stdlib does
_time = time.time
_sleep = time.sleep
min = __builtin__.min
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
if not gotit:
self._waiters.discard((waiter, None))
return False
else:
return True
def notify(self):
"""Notify one waiter
This will notify one waiter, regardless of whether it is a
thread or eventlet, resulting in the waiter returning from
it's .wait() call.
This will never block itself so can be called from either a
thread or eventlet itself and will wake up the hub of another
thread if an eventlet from it is notified.
"""
if self._waiters:
waiter, hub = self._waiters.pop()
if hub is None:
# This is a waiting thread
try:
waiter.release()
except thread.error:
pass
else:
# This is a waiting greenlet
def notif(waiter):
waiter.switch()
hub.schedule_call_global(0, notif, waiter)
if hub is not eventlet.hubs.get_hub():
self._kick_hub(hub)
def notify_all(self):
"""Notify all waiters
Similar to .notify() but will notify all waiters instead of
just one.
"""
for i in xrange(len(self._waiters)):
self.notify()
def _create_pipe(self, hub):
"""Create a pipe for a hub
This creates a pipe (read and write fd) and registers it with
the hub so that ._kick_hub() can use this to signal the hub.
This keeps a cache of hubs on ``self.hubcache`` so that only
one pipe is created per hub. Furthermore this dict is never
cleared implicitly to avoid creating new sockets all the time.
This method is always called from .gwait() and therefore can
only run once for a given hub at the same time. Thus it is
threadsave.
"""
if hub in self.hubcache:
return
def read_callback(fd):
# This just reads the (bogus) data just written to empty
# the os queues. The only purpose was to kick the hub
# round it's loop which is now has. The notif function
# scheduled by .notify() will now do it's work.
os.read(fd, 512)
rfd, wfd = os.pipe()
listener = hub.add(eventlet.hubs.hub.READ, rfd, read_callback)
self.hubcache[hub] = (rfd, wfd, listener)
def _kick_hub(self, hub):
"""Kick the hub around it's loop
Threads need to be able to kick a hub around their loop by
interrupting the sleep. This is done with the help of a
filedescriptor to which the thread writes a byte (using this
method) which will then wake up the hub.
"""
rfd, wfd, r_listener = self.hubcache[hub]
current_hub = eventlet.hubs.get_hub()
if current_hub.running:
def write(fd):
os.write(fd, 'A')
current_hub.remove(w_listener)
w_listener = current_hub.add(eventlet.hubs.hub.WRITE, wfd, write)
else:
os.write(wfd, 'A')
def __repr__(self):
return ('<gsync.Notifier object at 0x%x (%d waiters)>' %
(id(self), len(self._waiters)))
That's all folks
So it seems that with some careful thinkering you can create all your tried and tested tools to communicate between threads and eventlets or between eventlets in different threads. This makes adopting eventlet into an existing application a whole lot more approachable. It certainly helped me!
6 comments:
Denis said...
Note, that current development version of gevent at https://bitbucket.org/denis/gevent/src (0.14) does support multiple event loops per process. Also, Gevent's API is more familiar to most developers than eventlet's: for example gevent.event.Event has the exact interface and semantics of threading.Event.
Unknown said...
Oh, that's interesting indeed! I guess this is because you are now switching to libev? This will make gevent even more tempting.
Ryan Williams said...
Sheesh Denis, Eventlet also has the same semantics of threading.Event, in eventlet.green.threading.Event. You know this, because you wrote the initial version. I think you are being deceitful. That false "fact" is not even relevant to the post at hand -- can you not post something positive without also trying to slam something else? It is behavior like that that hurts the open-source community.
Ryan Williams said...
To bring things back on topic, thanks Floris, I'm totally replying to you on the mailing list right now. (archive here: https://lists.secondlife.com/pipermail/eventletdev/2011-April/000956.html).
This stuff is pretty cool, and I'm glad to see someone lifting out the core of tpool into something reusable.
Denis said...
It's because we're moving on from libevent1.4 which uses a global variable to store the event loop. libevent2 fixes that and supports multiple event loops too, and this is what I've tried first but in the end I found that I like libev's API better.
Denis said...
Ryan, Here's what I was referring to re Event class https://bitbucket.org/which_linden/eventlet/src/dceaeafcf41e/eventlet/event.py#cl-12 In no way that's compatible with threading.Event
An my remark regarding API difference was in reply to this part of the article:
> "events (though not quite the event you're used too)"
I'd say it's relevant and you'd know that if you read the article before posting a comment rather than immediately starting whining about open source community being hurt. What's your next argument will be, "think of the children"? http://en.wikipedia.org/wiki/Children%27s_interests#Usage_to_circumvent_logical_debate
New comments are not allowed.