使用 Celery Once 來防止 Celery 重復(fù)執(zhí)行同一個任務(wù)
在使用 Celery 的時(shí)候發(fā)現(xiàn)有的時(shí)候 Celery 會將同一個任務(wù)執(zhí)行兩遍,我遇到的情況是相同的任務(wù)在不同的 worker 中被分別執(zhí)行,并且時(shí)間只相差幾毫秒。這問題我一直以為是自己哪里處理的邏輯有問題,后來發(fā)現(xiàn)其他人 也有類似的問題,然后基本上出問題的都是使用 Redis 作為 Broker 的,而我這邊一方面不想將 Redis 替換掉,就只能在 task 執(zhí)行的時(shí)候加分布式鎖了。
不過在 Celery 的 issue 中搜索了一下,有人使用 Redis 實(shí)現(xiàn)了分布式鎖,然后也有人使用了 Celery Once。 大致看了一下 Celery Once ,發(fā)現(xiàn)非常符合現(xiàn)在的情況,就用了下。
Celery Once 也是利用 Redis 加鎖來實(shí)現(xiàn), Celery Once 在 Task 類基礎(chǔ)上實(shí)現(xiàn)了 QueueOnce 類,該類提供了任務(wù)去重的功能,所以在使用時(shí),我們自己實(shí)現(xiàn)的方法需要將 QueueOnce 設(shè)置為 base
@task(base=QueueOnce, once={'graceful': True})
后面的 once 參數(shù)表示,在遇到重復(fù)方法時(shí)的處理方式,默認(rèn) graceful 為 False,那樣 Celery 會拋出 AlreadyQueued 異常,手動設(shè)置為 True,則靜默處理。
另外如果要手動設(shè)置任務(wù)的 key,可以指定 keys 參數(shù)
@celery.task(base=QueueOnce, once={'keys': ['a']}) def slow_add(a, b): sleep(30) return a + b
總得來說,分為幾步
第一步,安裝
pip install -U celery_once
第二步,增加配置
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery('tasks', broker='amqp://guest@localhost//') celery.conf.ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', 'default_timeout': 60 * 60 } }
第三步,修改 delay 方法
example.delay(10) # 修改為 result = example.apply_async(args=(10))
第四步,修改 task 參數(shù)
@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']}) def slow_add(a, b): sleep(30) return a + b
參考鏈接 https://github.com/cameronmaske/celery-once
到此這篇關(guān)于使用 Celery Once 來防止 Celery 重復(fù)執(zhí)行同一個任務(wù)的文章就介紹到這了,更多相關(guān) Celery 重復(fù)執(zhí)行同一個任務(wù)內(nèi)容請搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。