If you are going to run multiple workers and got some key error when doing async job in celery like (KeyError, Received unregistered task of type). 
This would be a solution.


Key statement

"Use different Queue name and run worker with that Queue name". reference


0. Structure

folder/

tasks/

some_tasks.py

the_other_tasks.py

scheduler.py 


1. scheduler.py

# import

from tasks.some_tasks import sum
from tasks.the_other_tasks import add

# use queue name, when you call task function
sum.apply_async(queue="some_tasks")
add.apply_async(queue="the_other_tasks")


2. tasks

A. some_tasks.py

app = Celery(..)
app.conf.task_default_queue = "some_tasks" 


B. the_other_tasks.py

app = Celery(..)
app.conf.task_default_queue = "the_other_tasks" 



3. running workers
$folder>celery -A tasks.some_tasks worker --loglevel=info --concurrency=1 -Q some_tasks
$folder>celery -A tasks.the_other_tasks worker --loglevel=info --concurrency=1 -Q the_other_tasks

*If you want to give name to worker, use -n option.
example) celery -A tasks.the_other_tasks worker --loglevel=info --concurrency=1 -n the_other_tasks -Q the_other_tasks

보통 celery.py와 tasks.py를 두고 worker를 실행시키고

애플리케이션 서버에서는 tasks.py를 import해서 사용한다.


근데, 여기서 사용자의 눈에는 안보이는 것이있다.

워커를 실행할때는 당연히 celery.py를 이용해서 실행하기 때문에 별 이상해 보일것이 없는데,

잡을 줄때는 어떻게 이녀석이 redis에 커넥션이 되서 일을 전달 할 수 있는 것일까?


tasks.py를 보면 우리가 사용한 app은 결국 celery.py에서 가져온 것이기 때문에 일을 주는 애들도 각각 이 celery.py를 실행하여 celery app instance를 만들기 때문에 저런 상황이 가능한 것이다. 즉, 워커들도 각각 celery.py를 실행하듯, 사용하는 쪽에서도 celery.py를 각각 실행해서 app을 가지고 있다는 것!


Python Celery를 보면 기본적으로 워커와 잡의 개념이기 때문에

일단 task를 요청하면 무조건 잡이 큐에 들어갈 것같다.

하지만 이건 오해다.


진실을 말하자면,

잡은 sync와 async로 요청을 할 수 있는데, 이때 sync로 요청하면 큐에 들어가지 않고 부른 프로세스에서 걍 처리한다. 

이렇게 직접 일을 처리하기위함도 있고 기타 등등의 설정을 유지하기 위해 워커뿐 아니라 잡을 요청하는 녀석도 celery app을 실행시키는 스크립트를 실행한다. (보통 celery.py 라고 명명하고 쓰는..)


그래서 주의할 것은

다이나믹 실행환경을 쓸경우 env라는 키를 만들어서 큐 Broker url을 동적으로 넣어줄 수 있는데, 이때 워커가 실행될때 설정되는것 이외에 잡을 던지는 녀석이 실행될때도 올바른 Broker url 이 들어가도록 해야한다는 것이다.

task revoke를 한다음에 다시 scheduled task를 가져오면 이전에 revoke된것들이 포함되어있다. 알고보니 revoked가 따로 있음 -_-

아직 revoke된거를 제외한 나머지 task만 가져오는건 못찾음.. 일단 revoke 안된것만 가져오는건 두 배열을 비교하는 수 밖에..


from celery.task.control import inspect

from celery.task.control import revoke


i = inspect()

queues = i.scheduled()

queues_revoked = i.revoked()

keys = queues.keys()

all_tasks = []

revoked_tasks = []

tasks = []


if len(keys) > 0:

    all_tasks = queues[keys[0]]

    revoked_tasks = queues_revoked[keys[0]]


for task in all_tasks:

    if task['request']['id'] not in revoked_tasks:

        revoke(task['request']['id'], terminate=True) 


Schduled push

http://docs.celeryproject.org/en/latest/reference/celery.app.task.html?highlight=apply_async#celery.app.task.Task.apply_async

apply_async eta 파라미터로 datetime object를 넘겨주면됨.

+ Recent posts