Celery for Twisted: manage Celery tasks from twisted using the Deferred API
Celery is an outstanding choice for dispatching short-lived, computationally-expensive tasks to a distributed backend system. Note the emphasis; Celery is ill-suited for tasks tasks that require updating some in-memory representation with out-of-process data. If you want a specific process to read data from standard input, for instance, good luck...
Twisted can be though of as having the opposite problem. Twisted is very good at maintaining and updating in-memory representations over extended periods of time, but fails miserably at performing expensive computations. Twisted notably has no built-in constructs for managing distributed task queues.
As its name suggests, txCelery elegantly couples these two frameworks together, and in so doing allows them to compliment each other. Developers can now create long-running processes whose expensive subroutines can be farmed out to a distributed computational cluster.
And best of all, txCelery fully leverages Twisted's Deferred API, so there's no need to drink yet another framework's Koolaid.
Note: These instructions assume you have a working installation of Celery.
The recommended way of installing txCelery is through pip. PyPI will contain the latest stable version of txCelery.
First, install pip. On Debian/Ubuntu systems, this is achieved with the sudo apt-get install python-pip command.
Next, let's install the latest stable version of txCelery:
pip install txCelery --userto install for your usersudo pip install txCeleryto install system-wide
The latest development files can be obtained by cloning the github repo, checking out the dev branch, and running python setup.py develop --user. It is strongly recommended that you do not use the development version in production.
txCelery's API is so simple it brings tears to our eyes. There are exactly one and a half constructs. Yes, one and one half.
In order to use a Celery task with Twisted, you must wrap your Celery task with a DeferrableTask-class decorator. In your tasks.py (or wherever you keep your Celery tasks):
from celery import Celery
from txcelery.defer import DeferrableTask
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@DeferrableTask
@app.task
def my_task(*args, **kw):
# do somethingThere's just one thing to bear in mind: contrary to the Celery documentation's insistance that @app.task be the top-most decorator in your function definition, DeferrableTask expects to wrap a celery task and will throw a TypeError if it receives anything else.
Once you've wrapped your task with the DeferrableTask-class decorator, you'll find all the usual task methods like delay, apply_async, subtask, chain, etc. The difference is that those which used to return a celery.result.AsyncResult will now return a twisted.internet.defer.Deferred instance when they are called (ok, actually a subclass of Deferred, but more on that in a second).
So what of this subclass of Twisted's Deferred? It can be thought of as a Deferred that also gives transparent access to all the attributes and methods of it's associated AsyncResult instance. It can be thought of in those terms because that's exactly what it is, and that's why this part of the API only constitutes half of a thing to learn.
Our subclass is called DeferredTask, it lives in txcelery.defer and as far as Twisted is concerned it's just a plain old Deferred. DeferredTasks can be chained, passed to maybeDeferred, joined via gatherResults and DeferredList, etc.
DeferredTask monitors the state of the task and fires with a callback if the task succeeds, or with an errback if the task fails. If the task is revoked, DeferredTask fires with an errback containing a twisted.defer.CancelledError as it's Failure value.
- Wrap a task with a
DeferrableTask - Call task methods and obtain a
DeferredTaskinstance in lieu of anAsyncResult - Use
DeferredTaskas if it were a regularDeferredor a regularAsyncResult
And that's really all there is to it.