[python] How to check task status in Celery?

How does one check whether a task is running in celery (specifically, I'm using celery-django)?

I've read the documentation, and I've googled, but I can't see a call like:

my_example_task.state() == RUNNING

My use-case is that I have an external (java) service for transcoding. When I send a document to be transcoded, I want to check if the task that runs that service is running, and if not, to (re)start it.

I'm using the current stable versions - 2.4, I believe.

This question is related to python web-services celery django-celery

The answer is


I found helpful information in the

Celery Project Workers Guide inspecting-workers

For my case, I am checking to see if Celery is running.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

You can play with inspect to get your needs.


Creating an AsyncResult object from the task id is the way recommended in the FAQ to obtain the task status when the only thing you have is the task id.

However, as of Celery 3.x, there are significant caveats that could bite people if they do not pay attention to them. It really depends on the specific use-case scenario.

By default, Celery does not record a "running" state.

In order for Celery to record that a task is running, you must set task_track_started to True. Here is a simple task that tests this:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

When task_track_started is False, which is the default, the state show is PENDING even though the task has started. If you set task_track_started to True, then the state will be STARTED.

The state PENDING means "I don't know."

An AsyncResult with the state PENDING does not mean anything more than that Celery does not know the status of the task. This could be because of any number of reasons.

For one thing, AsyncResult can be constructed with invalid task ids. Such "tasks" will be deemed pending by Celery:

>>> task.AsyncResult("invalid").status
'PENDING'

Ok, so nobody is going to feed obviously invalid ids to AsyncResult. Fair enough, but it also has for effect that AsyncResult will also consider a task that has successfully run but that Celery has forgotten as being PENDING. Again, in some use-case scenarios this can be a problem. Part of the issue hinges on how Celery is configured to keep the results of tasks, because it depends on the availability of the "tombstones" in the results backend. ("Tombstones" is the term use in the Celery documentation for the data chunks that record how the task ended.) Using AsyncResult won't work at all if task_ignore_result is True. A more vexing problem is that Celery expires the tombstones by default. The result_expires setting by default is set to 24 hours. So if you launch a task, and record the id in long-term storage, and more 24 hours later, you create an AsyncResult with it, the status will be PENDING.

All "real tasks" start in the PENDING state. So getting PENDING on a task could mean that the task was requested but never progressed further than this (for whatever reason). Or it could mean the task ran but Celery forgot its state.

Ouch! AsyncResult won't work for me. What else can I do?

I prefer to keep track of goals than keep track of the tasks themselves. I do keep some task information but it is really secondary to keeping track of the goals. The goals are stored in storage independent from Celery. When a request needs to perform a computation depends on some goal having been achieved, it checks whether the goal has already been achieved, if yes, then it uses this cached goal, otherwise it starts the task that will effect the goal, and sends to the client that made the HTTP request a response that indicates it should wait for a result.


The variable names and hyperlinks above are for Celery 4.x. In 3.x the corresponding variables and hyperlinks are: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.


for simple tasks, we can use http://flower.readthedocs.io/en/latest/screenshots.html and http://policystat.github.io/jobtastic/ to do the monitoring.

and for complicated tasks, say a task which deals with a lot other modules. We recommend manually record the progress and message on the specific task unit.


res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())

  • First,in your celery APP:

vi my_celery_apps/app1.py

app = Celery(worker_name)
  • and next, change to the task file,import app from your celery app module.

vi tasks/task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """


Apart from above Programmatic approach Using Flower Task status can be easily seen.

Real-time monitoring using Celery Events. Flower is a web based tool for monitoring and administrating Celery clusters.

  1. Task progress and history
  2. Ability to show task details (arguments, start time, runtime, and more)
  3. Graphs and statistics

Official Document: Flower - Celery monitoring tool

Installation:

$ pip install flower

Usage:

http://localhost:5555

Answer of 2020:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"

Old question but I recently ran into this problem.

If you're trying to get the task_id you can do it like this:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Now you know exactly what the task_id is and can now use it to get the AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4

Try:

task.AsyncResult(task.request.id).state

this will provide the Celery Task status. If Celery Task is already is under FAILURE state it will throw an Exception:

raised unexpected: KeyError('exc_type',)


Return the task_id (which is given from .delay()) and ask the celery instance afterwards about the state:

x = method.delay(1,2)
print x.task_id

When asking, get a new AsyncResult using this task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

Just use this API from celery FAQ

result = app.AsyncResult(task_id)

This works fine.


You can also create custom states and update it's value duting task execution. This example is from docs:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states


Examples related to python

programming a servo thru a barometer Is there a way to view two blocks of code from the same file simultaneously in Sublime Text? python variable NameError Why my regexp for hyphenated words doesn't work? Comparing a variable with a string python not working when redirecting from bash script is it possible to add colors to python output? Get Public URL for File - Google Cloud Storage - App Engine (Python) Real time face detection OpenCV, Python xlrd.biffh.XLRDError: Excel xlsx file; not supported Could not load dynamic library 'cudart64_101.dll' on tensorflow CPU-only installation

Examples related to web-services

How do I POST XML data to a webservice with Postman? How to send json data in POST request using C# org.springframework.web.client.HttpClientErrorException: 400 Bad Request How to call a REST web service API from JavaScript? The request was rejected because no multipart boundary was found in springboot Generating Request/Response XML from a WSDL How to send a POST request using volley with string body? How to send post request to the below post method using postman rest client How to pass a JSON array as a parameter in URL Postman Chrome: What is the difference between form-data, x-www-form-urlencoded and raw

Examples related to celery

Celery Received unregistered task of type (run example) How to check task status in Celery? Deleting all pending tasks in celery / rabbitmq Retrieve list of tasks in a queue in Celery

Examples related to django-celery

How to check task status in Celery?