devork

E pur si muove

Synchronising eventlets and threads

Thursday, March 17, 2011

Eventlet is an asynchronous network I/O framework which combines an event loop with greenlet-based coroutines to provide a familiar blocking-like API to the developer. One of the reasons I like eventlet a lot is that the technology it builds on allows it's event loop to run inside a thread or even run multiple event loops in different threads. This makes it a lot more amenable to slowly evolving existing applications to be more asynchronous then solutions like e.g. gevent which only allow one event loop per process.

Eventlet isn't the most mature of tools however and it's API shows signs of being developed as needs arose. Not that this is necessarily a bad thing, APIs do need to grow from being used, but don't be surprised if you need to dig down into some parts and discover rough edges (hi IPv6!). The API does have a decent collection of tools you'll be familiar with however: greenlet-local storage, semaphores, events (though not quite the event you're used too) and even some extra goodies like pools, WSGI servers, DBAPI2 connection pools and ZeroMQ support. But you'll notice that all these goodies are designed to work in a greenlet-only world. And the one place where threads are acknowledged, a global threadpool of workers as a last resort to make things behave asynchronous, looks very messy and entirely not reusable (it's full of module globals for one). So if you're introducing eventlet into an existing application and you need to communicate data and events between threads and eventlets you'll find a void.

So after some studying of the tpool module I decided to build a class which could synchronise between threads and eventlets. This class, which I called a Notifier, can be basically thought of as a Condition without the lock, i.e. there are three methods: .wait(), .notify() and the rather similar .notify_all(). The idea is that any thread or eventlet which calls .wait() will block (cooperatively block in the case of a greenlet) until it gets woken up by a call to one of the notifying methods. That's all there is to it.

Building a Notifier

(Be prepared to look at the source code for eventlet.hubs.hub, threading and related code when reading this.)

Firstly the class will need to be constructed. For now there's only one interesting instance attribute and that is _waiters which is a set which will contain all the threads and eventlets currently blocking on a call to .wait().

def __init__(self, hubcache=GLOBAL_HUBCACHE):
        self._waiters = set()
        self.hubcache = hubcache
Don't worry yet about what goes into the set of waiters and also ignore the hubcache for now. We'll get to those later.

Now lets build the .wait() method: it needs to block until notified. But blocking is significantly different when you're running in a thread then when running in an eventlet. The basics are that in a thread you want to really block using the locking primitives provided by the OS (exposed to Python in the thread module) while an eventlet essentially wants to switch to the event loop, called the hub, in the hope someone will eventually switch back to it when it needs to wake up. These two are so different that they easily divide in two methods: .gwait() for blocking eventlets and .twait() for blocking threads.

def wait(self, timeout=None):
    hub = eventlet.hubs.get_hub()
    if hub.running:
        self.gwait(timeout)
    else:
        self.twait(timeout)
You could call these directly obviously, but as you can see abstracting them away is not that hard. You can easily detect if you're in an eventlet by checking if the hub (which is thread-local) is actually running. The price you pay for this is that this will create a hub instance in each thread, even if it is not used. But the worst this does is waste some memory. (This could be avoided by checking for the eventlet.hubs._threadlocal.hub attribute, but that's even more internal then .get_hub().)

As already mentioned the basics of blocking in an eventlet is to switch to the hub and then wait until some other greenlet running in the same thread switches back to you. So a notifier needs to have a reference to your geenlet instance so it can call .switch() on it when the time comes to wake you up. But what does another thread do? Well it turns out another thread could ask your hub to schedule a function to run in your thread using the hub's .schedule_call_global() method since the only thread-critical operation is an append on the hubs's next_timers list, which is a thread-safe operation. Now there is another catch, remember that the hub is basically an event loop? Well if no events happen then it will not be going round it loop! And appening something to a list is not creating an event. So what you need to do is use os.pipe so you have a filedescriptor which you can register with the hub and now you can just write some data into this pipe when you want the hub of the waiter to wake up. Setting up this pipe is what the mysterious call to ._create_pipe() does, we'll see it in detail later. The rest is just simple sugar: dealing with timeouts and returning the correct values for them:

def gwait(self, timeout=None):
    waiter = eventlet.getcurrent()
    hub = eventlet.hubs.get_hub()
    self._create_pipe(hub)
    self._waiters.add((waiter, hub))
    if timeout and timeout > 0:
        timeout = eventlet.Timeout(timeout)
        try:
            with timeout:
                hub.switch()
        except eventlet.Timeout, t:
            if t is not timeout:
                raise
            self._waiters.discard((waiter, hub))
            return False
        else:
            return True
    else:
        hub.switch()
        return True

Next on lets look at how you block in a thread. This is actually surprisingly simple, just copy what threading.Condition.wait() does for it: it is perfectly non-blocking for a greenlet to call .release() on a lock which was acquired by another thread. Notice the ugly CPU-consuming spinning which happens when a timeout is in use, luckily this has been fixed in python 3.2 (issue7316).

def twait(self, timeout=None):
    waiter = threading.Lock()
    waiter.acquire()
    self._waiters.add((waiter, None))
    if timeout is None:
        waiter.acquire()
        return True
    else:
        # Spin around a little, just like the stdlib does
        _time = time.time
        _sleep = time.sleep
        min = __builtin__.min
        endtime = _time() + timeout
        delay = 0.0005      # 500 us -> initial delay of 1 ms
        while True:
            gotit = waiter.acquire(0)
            if gotit:
                break
            remaining = endtime - _time()
            if remaining <= 0:
                break
            delay = min(delay * 2, remaining, .05)
            _sleep(delay)
        if not gotit:
            self._waiters.discard((waiter, None))
            return False
        else:
            return True
The last thing of interest here is how the hub does not matter, so we just place None in the set of waiters.

Now lets have a look at what notify looks like. We've already discussed what needs to happen here: to notify an eventlet we need schedule a call with it's hub to switch to it (no point special casing when we're already running in that same hub). In case we notified it from a different thread we also need to signal the hub using the pipe set up by .wait() so it will actually start to go round it's loop and execute this just scheduled call. Notifying a thread is even easier: just unlock the lock it's trying to acquire.

def notify(self):
    if self._waiters:
        waiter, hub = self._waiters.pop()
        if hub is None:
            # This is a waiting thread
            try:
                waiter.release()
            except thread.error:
                pass
        else:
            # This is a waiting greenlet
            def notif(waiter):
                waiter.switch()
            hub.schedule_call_global(0, notif, waiter)
            if hub is not eventlet.hubs.get_hub():
                self._kick_hub(hub)

Admittedly I've hidden some of the cute trickery away in those ._create_pipe() and ._kick_hub() calls, so lets leave the boring .notify_all() and skip straight to them.

The principle for this is that each eventlet which calls .gwait() needs to ensure there is a pipe available to which a thread can write something. If the hub of the eventlet was waiting for the reading end of this pipe to become readable it will wake up and notice it has to run the notif() function which was scheduled by our call to .notify(). But creating a pipe for each call to .gwait() does seem rather wasteful and this is where the mysterious hubcache comes into play: it is a dictionary keeping track of the pipe associated with each hub.

def _create_pipe(self, hub):
    if hub in self.hubcache:
        return
    def read_callback(fd):
        os.read(fd, 512)
    rfd, wfd = os.pipe()
    listener = hub.add(eventlet.hubs.hub.READ, rfd, read_callback)
    self.hubcache[hub] = (rfd, wfd, listener)
You can see this asks the hub to wake up when the reading end of the created pipe becomes readable and when this happens read_callback() will be called. The only purpose of read_callback() is to read all the data written to the pipe so that the OS buffers are emptied and the pipe can be re-used to wake the hub up the next time.

Now there is just ._kick_hub() left. This should now be obvious: look up the hub in the hubcache and write some data to the writing end of the pipe. The only gotcha here is that while we might be called from another thread, this does not mean the calling thread itself can't be part of an eventlet mainloop. So in that case make sure not to do a blocking write (as unlikely as that might be).

def _kick_hub(self, hub):
    rfd, wfd, r_listener = self.hubcache[hub]
    current_hub = eventlet.hubs.get_hub()
    if current_hub.running:
        def write(fd):
            os.write(fd, 'A')
            current_hub.remove(w_listener)
        w_listener = current_hub.add(eventlet.hubs.hub.WRITE, wfd, write)
    else:
        os.write(wfd, 'A')
Notice here how in the async case we remove the listener which was used to wake the hub up right from the callback itself. If we didn't do this then the hub would most likely find the writing end of the pipe writable again on the next loop which would trigger another notification of the other hub, not what we want!

That's cute, but what now?

We've now got a great way of notifying other threads and eventlets at will. But this is an entirely non-standard tool! Using this is strange, unfamiliar and unwieldy. This isn't one of the synchronisation primitives we know and wanted to use.

But look how easy it is to build a lock now: we just need a real lock and one of these strange notifiers:

class Lock(object):

    def __init__(self, hubcache=GLOBAL_HUBCACHE):
        self.hubcache = hubcache
        self._notif = Notifier(hubcache)
        self._lock = threading.Lock()
        self.owner = None

    def acquire(self, blocking=True, timeout=None):
        gotit = self._lock.acquire(False)
        if gotit or not blocking:
            if gotit:
                self.owner = eventlet.getcurrent()
            return gotit
        if timeout is None:
            while not gotit:
                self._notif.wait()
                gotit = self._lock.acquire(False)
            if gotit:
                self.owner = eventlet.getcurrent()
                return True
        else:
            if timeout < 0:
                raise RuntimeError('timeout must be greater or equal then 0')
            now = time.time()
            end = now + timeout
            while not gotit and (now < end):
                self._notif.wait(end - now)
                gotit = self._lock.acquire(False)
                now = time.time()
            if gotit:
                self.owner = eventlet.getcurrent()
            return gotit

    __enter__ = acquire

    def release(self):
        self._lock.release()
        self.owner = None
        self._notif.notify()

    def __exit__(self, exc_type, exc_value, traceback):
        self.release()

    def __repr__(self):
        return ('<gsync.Lock object at 0x%x (%r, %r)>' %
                (id(self), self.owner, self._notif))
Most of this code is to deal with the timeout of the lock. If you would only implement a lock as how it was before Python 3.2 this would have been even simpler. (Oh, also notice the owner attribute, that will come in handy for the next tool.)

Of course now we have a lock we can easily build the next tool: a proper Condition. It's almost like I planned this!

class Condition(object):

    def __init__(self, lock=None, hubcache=GLOBAL_HUBCACHE):
        if lock is None:
            self._lock = Lock(hubcache=hubcache)
        else:
            self._lock = lock
        self._notif = Notifier(hubcache=hubcache)

        # Export the lock methods
        self.acquire = self._lock.acquire
        self.release = self._lock.release
        self.__enter__ = self._lock.__enter__
        self.__exit__ = self._lock.__exit__

    def __repr__(self):
        return '<gsync.Condition (%r, %r)>' % (self._lock, self._notif)

    def wait(self, timeout=None):
        if self._lock.owner is not eventlet.getcurrent():
            raise RuntimeError('Can not wait on un-acquired lock')
        self._lock.release()
        try:
            self._notif.wait(timeout)
        finally:
            self._lock.acquire()

    def notify(self):
        if self._lock.owner is not eventlet.getcurrent():
            raise RuntimeError('Can not notify on un-acquired lock')
        self._notif.notify()

    def notify_all(self):
        if self._lock.owner is not eventlet.getcurrent():
            raise RuntimeError('Can not notify on un-acquired lock')
        self._notif.notify_all()
No surprises here. This is truly getting trivial to implement thanks to our previous two primitives.

Now once we have a condition we can finally get to the real prise: a queue to move data freely between threads and eventlets.

import Queue as queue

class BaseQueue(object):

    def __init__(self, maxsize=0, hubcache=GLOBAL_HUBCACHE):
        self.hubcache = hubcache
        self.maxsize = maxsize
        self._init(maxsize)
        self.mutex = Lock(hubcache=hubcache)
        self.not_empty = Condition(self.mutex, hubcache=hubcache)
        self.not_full = Condition(self.mutex, hubcache=hubcache)
        self.all_tasks_done = Condition(self.mutex, hubcache=hubcache)
        self.unfinished_tasks = 0

class Queue(BaseQueue, queue.Queue):
    pass
Great, only had to provide a new .__init__() which uses our own locking primitives, everything else of the stdlib Queue class can be re-used.

But why the strange diversion to create a separate BaseQueue class? Well, it makes making the priority and lifo queues very easy:

class PriorityQueue(BaseQueue, queue.PriorityQueue):
    pass

class LifoQueue(BaseQueue, queue.LifoQueue): pass
That's right, mro FTW!

Caveats when mixing threads and eventlets

There is one issue to watch out for: imagine a thread which consumes items from a queue, spawning eventlets to do the work.

class Worker(threading.Thread):
    def __init__(self, inputq):
        self.inputq = inputq

    def run(self):
        eventlet.sleep(0)  # start the hub
        while True:
            item = self.inputq.get()
            if item is PoisonPill:
                break
            eventlet.spawn(self.do_stuff, item)
Notice that eventlet.sleep(0) line? It basically switches to the hub, thereby implicitly starting it, which then switches back immediately. But why?

Remember the code for Notifier.wait(), it checked if the hub was running to detect whether it was being called from inside an eventlet or not. So if you didn't manage to start the hub before calling this method the whole thread will block! Hence the minor hack to start the hub manually beforehand.

All the code

Here all the code for the notifier in one piece, including the docstrings and comments. This also includes the global hubcache with a tiny bit of extra magic to be able to clear the cache if you so desire. Having this as a parameter to pass in allows you to use different hub caches if you have a reason to do so.

import os
import thread
import threading
import time

import eventlet


class HubCache(dict):
    """Cache used by Notifier instances

    This is a dict-subclass to overwrite the .clear() method.  It's
    keys are hubs and values are (rfd, wfd, listener).  Using this
    means you can clear the cache in a way which will unregister the
    listeners from the hubs and close all filedescriptors.

    XXX This is hugely incomplete, only remove items from this cache
        using the .clear() method as the other ways of removing items
        will not release resources properly.
    """

    def clear(self):
        while self:
            hub, (rfd, wfd, listener) = self.popitem()
            hub.remove(listener)
            os.close(rfd)
            os.close(wfd)

    def __del__(self):
        self.clear()


"""The global hubcache

This is the default hubcache used by Notifier instances.
"""
GLOBAL_HUBCACHE = HubCache()


class Notifier(object):
    """Notify one or more waiters

    This is essentially a condition without the lock.  It can be used
    to signal between threads and greenlets at will.
    """
    # This doesn't use eventlet.hubs.trampoline since that results in
    # a filedescriptor per waiting greenlet.  Instead each eventlet
    # that calls .gwait() will ensure there's a filedescriptor
    # registered for reading for with it's hub.  This filedescriptor
    # is then only used when another thread wants to wake up the hub
    # in order for a notification to be delivered to the eventlet.

    def __init__(self, hubcache=GLOBAL_HUBCACHE):
        """Initialise the notifier

        The hubcache is a dictionary which will keep pipes used by the
        notifier so that only ever one pipe gets created per hub.  The
        default is to share this hubcache globally so all notifiers
        use the same pipes for intra-hub communication.
        """
        # Each item in this set is a tuple of (waiter, hub).  For an
        # eventlet the waiter is the greenlet while for a thread it is
        # a lock.  For a thread the hub item is always None.
        self._waiters = set()
        self.hubcache = hubcache

    def wait(self, timeout=None):
        """Wait from a thread or eventlet

        This blocks the current thread/eventlet until it gets woken up
        by a call to .notify() or .notify_all().

        This will automatically dispatch to .gwait() or .twait() as
        needed so that the blocking will be cooperative for greenlets.

        Returns True if this thread/eventlet was notified and False
        when a timeout occurred.
        """
        hub = eventlet.hubs.get_hub()
        if hub.running:
            self.gwait(timeout)
        else:
            self.twait(timeout)

    def gwait(self, timeout=None):
        """Wait from an eventlet

        This cooperatively blocks the current eventlet by switching to
        the hub.  The hub will switch back to this eventlet when it
        gets notified.

        Usually you can just call .wait() which will dispatch to this
        method if you are in an eventlet.

        Returns True if this thread/eventlet was notified and False
        when a timeout occurred.
        """
        waiter = eventlet.getcurrent()
        hub = eventlet.hubs.get_hub()
        self._create_pipe(hub)
        self._waiters.add((waiter, hub))
        if timeout and timeout > 0:
            timeout = eventlet.Timeout(timeout)
            try:
                with timeout:
                    hub.switch()
            except eventlet.Timeout, t:
                if t is not timeout:
                    raise
                self._waiters.discard((waiter, hub))
                return False
            else:
                return True
        else:
            hub.switch()
            return True

    def twait(self, timeout=None):
        """Wait from an thread

        This blocks the current thread by using a conventional lock.

        Usually you can just call .wait() which will dispatch to this
        method if you are in an eventlet.

        Returns True if this thread/eventlet was notified and False
        when a timeout occurred.
        """
        waiter = threading.Lock()
        waiter.acquire()
        self._waiters.add((waiter, None))
        if timeout is None:
            waiter.acquire()
            return True
        else:
            # Spin around a little, just like the stdlib does
            _time = time.time
            _sleep = time.sleep
            min = __builtin__.min
            endtime = _time() + timeout
            delay = 0.0005      # 500 us -> initial delay of 1 ms
            while True:
                gotit = waiter.acquire(0)
                if gotit:
                    break
                remaining = endtime - _time()
                if remaining <= 0:
                    break
                delay = min(delay * 2, remaining, .05)
                _sleep(delay)
            if not gotit:
                self._waiters.discard((waiter, None))
                return False
            else:
                return True

    def notify(self):
        """Notify one waiter

        This will notify one waiter, regardless of whether it is a
        thread or eventlet, resulting in the waiter returning from
        it's .wait() call.

        This will never block itself so can be called from either a
        thread or eventlet itself and will wake up the hub of another
        thread if an eventlet from it is notified.
        """
        if self._waiters:
            waiter, hub = self._waiters.pop()
            if hub is None:
                # This is a waiting thread
                try:
                    waiter.release()
                except thread.error:
                    pass
            else:
                # This is a waiting greenlet
                def notif(waiter):
                    waiter.switch()
                hub.schedule_call_global(0, notif, waiter)
                if hub is not eventlet.hubs.get_hub():
                    self._kick_hub(hub)

    def notify_all(self):
        """Notify all waiters

        Similar to .notify() but will notify all waiters instead of
        just one.
        """
        for i in xrange(len(self._waiters)):
            self.notify()

    def _create_pipe(self, hub):
        """Create a pipe for a hub

        This creates a pipe (read and write fd) and registers it with
        the hub so that ._kick_hub() can use this to signal the hub.

        This keeps a cache of hubs on ``self.hubcache`` so that only
        one pipe is created per hub.  Furthermore this dict is never
        cleared implicitly to avoid creating new sockets all the time.

        This method is always called from .gwait() and therefore can
        only run once for a given hub at the same time.  Thus it is
        threadsave.
        """
        if hub in self.hubcache:
            return
        def read_callback(fd):
            # This just reads the (bogus) data just written to empty
            # the os queues.  The only purpose was to kick the hub
            # round it's loop which is now has.  The notif function
            # scheduled by .notify() will now do it's work.
            os.read(fd, 512)
        rfd, wfd = os.pipe()
        listener = hub.add(eventlet.hubs.hub.READ, rfd, read_callback)
        self.hubcache[hub] = (rfd, wfd, listener)

    def _kick_hub(self, hub):
        """Kick the hub around it's loop

        Threads need to be able to kick a hub around their loop by
        interrupting the sleep.  This is done with the help of a
        filedescriptor to which the thread writes a byte (using this
        method) which will then wake up the hub.
        """
        rfd, wfd, r_listener = self.hubcache[hub]
        current_hub = eventlet.hubs.get_hub()
        if current_hub.running:
            def write(fd):
                os.write(fd, 'A')
                current_hub.remove(w_listener)
            w_listener = current_hub.add(eventlet.hubs.hub.WRITE, wfd, write)
        else:
            os.write(wfd, 'A')

    def __repr__(self):
        return ('<gsync.Notifier object at 0x%x (%d waiters)>' %
                (id(self), len(self._waiters)))

That's all folks

So it seems that with some careful thinkering you can create all your tried and tested tools to communicate between threads and eventlets or between eventlets in different threads. This makes adopting eventlet into an existing application a whole lot more approachable. It certainly helped me!

Subscribe to: Posts (Atom)