devork

E pur si muove

Selectable queue

Saturday, May 29, 2010

Sometimes you'd want to use something like select.select() on Queues. If you do some searching for this it turns out that this question has been answered for multiprocessing Queues where you can simply use the real select on the underlying socket (IIRC), but for a good old fashioned Queue you're stuck.

Now it's easy to argue that this isn't that high a need, when I wanted this a while ago it turned out to be surprisingly simple to re-structure the design a little so that I no longer desired a selectable queue. But it's still something that hung around the back of my mind for a while, so I've kept thinking about it. My conclusion for now (which I haven't bothered implementing) is that simply cloning the normal queues but replacing the not_empty and not_full Conditions by Events gives you selectable queues. It changes the semantics slightly, but that doesn't seem harmful. This obviously isn't enough, so the second change to the queue is that you should be allowed to pass in the events to use. And now that you can share the not_full event between two queues you can simply wait on this event and you have your select.

Next time I want this I might actually implement this idea rather then re-design so that I don't want selectable queues anymore.

6 comments:

Jean-Paul Calderone said...

Can you explain what you mean in a bit more detail? The way I interpret your idea, you're suggesting that this doesn't happen:

>>> import threading
>>> import select
>>> select.select([threading.Event()], [], [], 0)
Traceback (most recent call last):
File "", line 1, in
TypeError: argument must be an int, or have a fileno() method.
>>>

Is there something else you mean when you say "Event"?

Floris Bruynooghe said...

I never meant the normal select would work. I meant once you replace the not_empty condition in a Queue by a event you can write a new select-like function which would look something like (only rlist done here, wlist would work with not_full):

qselect(rlist, timeout=None):
    event = rlist[0].not_empty
    for q in rlist:
        assert q.not_empty is event
    event.wait(timeout)
    rready = []
    for q in rlist:
        if not q.empty():
            rready.append(q)
    return rready

(you may want to look at optimising locking here, yada yada)

Floris Bruynooghe said...

Queue.get(timeout) still only works on one list, my (hypothetical) qselect will return as soon as one of the the queues has an item in them. Looping over the queues with a .get(timeout) seems wasteful (the thread doesn't sleep as much as it could) and adds a delay.

Calvin Spealman said...

I've always that it would be good (or, at least, interesting) if there was a way to plug additional things into select. If you could define some API that an object could present to tell select() how to wait for it, maybe just by producing a fileno it will write to when an event happens, specifically for select() to work, then you could pass it all sorts of things.

Jean-Paul Calderone said...

> I've always that it would be good (or, at least, interesting) if there was a way to plug additional things into select. If you could define some API that an object could present to tell select() how to wait for it, maybe just by producing a fileno it will write to when an event happens, specifically for select() to work, then you could pass it all sorts of things.

Well, you can, can't you? Just do that thing you said wilth filenos, and then pass the fileno to select.

Alternatively, create an object with a fileno method, and pass that object to select(). Then you'll even get the object back out again, instead of a fileno that you have to associate back with your original object.

So you *could* make a select.select()-able Queue, if you associate a pipe with it and override each method that puts an object into it to also write to the write end of the pipe. Then select on the queue and you'll get told when something was put into it. This is sort of how Twisted's threading support works, by the way (there's no actual Queue object, but there's a list and a pipe, and whenever something is appended to the list, the pipe is written to, too).

New comments are not allowed.

Subscribe to: Post Comments (Atom)