python - Plotting Celery canvas before running -
there documentation on how produce graph after running canvas job in celery. i'd generate graph before run job.
say created simple chain:
c = chain(add.s(1, 2), mul(4))
how can generate graph of chain?
thanks,
miki
i had exact same desire. generate graph before running job. worked bit on :)
it appears celery not allow it. reason (at least understood when trying it) in graph each node has have unique name. once canvas executed unique name celery task_id before execution there nothing allow such distinction.
so solution generate graph yourself, , of course identify uniquely each node (for counter can work).
this job of function:
# -*- coding: utf-8 -*- celery.canvas import chain, group, signature def analyze_canvas(canvas): return _analyze_canvas(canvas)['dependencies'] def _analyze_canvas(canvas, previous=[], i=0): dependencies = [] if isinstance(canvas, chain): t in canvas.tasks: if not (isinstance(t, group) or isinstance(t, chain)): n = str(t) + " - (" + str(i) + ")" += 1 dependencies.append((n, previous)) previous = [n] else: analysis = _analyze_canvas(t, previous, i) dependencies.extend(analysis['dependencies']) previous = analysis['previous'] elif isinstance(canvas, group): new_previous = [] t in canvas.tasks: if not (isinstance(t, group) or isinstance(t, chain)): n = str(t) + " - (" + str(i) + ")" += 1 dependencies.append((n, previous)) new_previous.append(n) else: analysis = _analyze_canvas(t, previous, i) dependencies.extend(analysis['dependencies']) new_previous = analysis['previous'] previous = new_previous elif isinstance(canvas, signature): n = str(t) + " - (" + str(i) + ")" += 1 dependencies.append((n, previous)) previous = [n] return {"dependencies": dependencies, "previous": previous}
it generates dependency graph of canvas. idea iterate other tasks of canvas , identify group/chain/signatures generate right dependencies.
from point can use more celery utils generate dot file. here small usage example:
from celery_util import analyze_canvas celery.datastructures import dependencygraph celery import celery, group app = celery() @app.task def t1(): pass @app.task def t2(): pass canvas = t1.si() | t2.si() | group(t1.si(), t1.si(), t2.si()) | t2.si() d = analyze_canvas(canvas) dg = dependencygraph(it=d) pipo = open("pipo.dot", "w+") dg.to_dot(pipo)
in example declare dummy tasks , chain/group them in nice pretty canvas. use celery util dependencygraph
have object representation , ability dump graph in dot to_dot
method.
and beautiful result is:
Comments
Post a Comment