詳解Python Celery和RabbitMQ實戰教程
Celery是一個異步任務隊列。它可以用于需要異步運行的任何內容。RabbitMQ是Celery廣泛使用的消息代理。在本這篇文章中,我將使用RabbitMQ來介紹Celery的基本概念,然后為一個小型演示項目設置Celery 。最后,設置一個Celery Web控制臺來監視我的任務
基本概念來!看圖說話:
BrokerBroker(RabbitMQ)負責創建任務隊列,根據一些路由規則將任務分派到任務隊列,然后將任務從任務隊列交付給worker
Consumer (Celery Workers)Consumer是執行任務的一個或多個Celery workers。可以根據用例啟動許多workers
Result Backend后端用于存儲任務的結果。但是,它不是必需的元素,如果不在設置中包含它,就無法訪問任務的結果
安裝Celery首先,需要安裝好Celery,可以使用PyPI:
pip install celery選擇一個Broker:RabbitMQ
為什么我們需要broker呢?這是因為Celery本身并不構造消息隊列,所以它需要一個額外的消息傳輸來完成這項工作。這里可以將Celery看作消息代理的包裝器
實際上,也可以從幾個不同的代理中進行選擇,比如RabbitMQ、Redis或數據庫(例如Django數據庫)
在這里使用RabbitMQ作為代理,因為它功能完整、穩定,Celery推薦使用它。由于演示我的環境是在Mac OS中,安裝RabbitMQ使用Homebrew即可:
brew install rabbitmq#如果是Ubuntu的話使用apt-get安裝啟動RabbitMQ
程序將在/usr/local/sbin中安裝RabbitMQ,雖然有些系統可能會有所不同。可以將此路徑添加到環境變量路徑,以便以后方便地使用。例如,打開shell啟動文件~/.bash_profile添加:
PATH=$PATH:/usr/local/sbin
現在,可以使用rabbitmq-server命令啟動我們的RabbitMQ服務器。檢查RabbitMQ服務器成功啟動,將看到類似的輸出:
RabbitMQ使用Celery之前,需要對RabbitMQ進行一些配置。簡單地說,我們需要創建一個虛擬主機和用戶,然后設置用戶權限,以便它可以訪問虛擬主機
# 添加用戶跟密碼$ rabbitmqctl add_user test test123# 添加虛擬主機$ rabbitmqctl add_vhost test_vhost# 為用戶添加標簽$ rabbitmqctl set_user_tags test test_tag# 設置用戶權限$ rabbitmqctl set_permissions -p test_vhost test '.*' '.*' '.*'
敲黑板!RabbitMQ中有三種操作:配置、寫入和讀取
上面命令末尾的字符串表示用戶test將擁有所有配置、寫入和讀取權限
演示項目現在讓我們創建一個簡單的項目來演示Celery的使用
在celery.py中添加以下代碼:
from __future__ import absolute_importfrom celery import Celeryapp = Celery(’test_celery’,broker=’amqp://test:test123@localhost/test_vhost’,backend=’rpc://’,include=[’test_celery.tasks’])
在這里,初始化了一個名為app的Celery實例,將用于創建一個任務。Celery的第一個參數只是項目包的名稱,即“test_celery”。
broker參數指定代理URL,對于RabbitMQ,傳輸是amqp。
后端參數指定后端URL。Celery中的后端用于存儲任務結果。因此,如果需要在任務完成時訪問任務的結果,應該為Celery設置一個后端。
rpc意味著將結果作為AMQP消息發送回去,這對本次演示來說是一種可接受的格式
include參數指定了在Celery工作程序啟動時要導入的模塊列表。我們在這里添加了tasks模塊,以便找到我們的任務。
在tasks.py這個文件中,定義了我們的任務add_longtime:
from __future__ import absolute_importfrom test_celery.celery import appimport time@app.taskdef add_longtime(a, b): print ’long time task begins’ # sleep 5 seconds time.sleep(5) print ’long time task finished’ return a + b
可以看到,導入了在前面的Celery模塊中定義的應用程序,并將其用作任務方法的裝飾器。另外注意!app.task只是一個裝飾器。此外,我們在add_longtime任務中休眠5秒,以模擬一個耗時較長的Task
在設置好Celery之后,我們需要開始運行任務,它包含在runs_tasks.py:
from .tasks import add_longtimeimport timeif __name__ == ’__main__’: result = add_longtime.delay(1,2)#此時,任務還未完成,它將返回False print ’Task finished? ’, result.ready() print ’Task result: ’, result.result # 延長到10秒以確保任務已經完成 time.sleep(10) # 現在任務完成,ready方法將返回True print ’Task finished? ’, result.ready() print ’Task result: ’, result.result
這里,我們使用delay方法調用任務add_longtime,如果我們想異步處理任務,就需要使用delay方法。此外,保存任務的結果并打印一些信息。如果任務已經完成,ready方法將返回True,否則返回False。result屬性是任務的結果,如果任務尚未完成,則返回None。
啟動Celery現在,可以使用下面的命令啟動Celery(注:在項目文件夾中運行):
celery -A test_celery worker --loglevel=info
Celery成功連接到RabbitMQ,你會看到這樣的東西:
再項目文件中輸入以下命令運行它:
python -m test_celery.run_tasks
查看Celery控制臺,看到運行任務:
[2020-05-15 17:15:21,508: INFO/MainProcess] Received task: test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3][2020-05-15 17:15:21,508: WARNING/Worker-3] long time task begins[2020-05-15 17:15:31,510: WARNING/Worker-3] long time task finished[2020-05-15 17:15:31,512: INFO/MainProcess]Task test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3] succeeded in 15.003732774s: 3
當Celery收到一個任務,它打印出任務名稱與任務id(在括號中):
Received task: test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5]
在這一行下面是我們的任務add_longtime打印的兩行,時間延遲為5秒:
long time task beginslong time task finished
最后一行顯示我們的任務在5秒內完成,任務結果為3:
Task test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5] succeeded in 5.025242167s: 3
在當前控制臺中,您將看到以下輸出:
Flower是一款基于網絡的Celery實時監控軟件。使用Flower,可以輕松地監視任務進度和歷史記錄
使用pip來安裝Flower:
pip install flower
要啟動Flower web控制臺,需要運行以下命令:
celery -A test_celery flower
Flower將運行具有默認端口5555的服務器,可以通過http://localhost:5555訪問web控制臺
到此這篇關于詳解Python Celery和RabbitMQ實戰教程的文章就介紹到這了,更多相關Python Celery和RabbitMQ實戰內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章:
