概述:
我們考慮一個場景,公司有一個需求,現在需要做一套web系統,而這套系統某些功能需要使用一些開源工具的sdk和api,或是運行一些耗時比較大的任務(單個大任務下可能有多個小任務),需要一段時間才能提供執行結果,而前端同事要求不能讓用戶在頁面等待,需要馬上提供一個回傳結果給他,任務執行完后可以拿到最終結果,并且用戶退出web界面或瀏覽器例外關閉之后,再次回傳界面,執行的程序不會中斷,并且支持多用戶同時執行不同操作的需要,
很明顯,這是一個-異步多執行緒-的場景,在Python中可以想到的有:
1.引入Asyncio模塊,利用多協程實作,
2.使用Threading模塊,自己撰寫執行緒任務,執行緒等待,睡眠,釋放執行緒的程序,
3.使用異步框架,例如Cerely、Tornado、Twisted等等,裝飾異步任務,
這里邊最便捷且開發效率最高的應該是使用異步框架,咱們選擇使用Celery來實作這個需求,
Celery介紹:
截圖與描述來自celery官網:Celery - Distributed Task Queue — Celery 5.2.0 documentation
Celery 是一個簡單、靈活且可靠的分布式系統,用于處理大量訊息,同時為操作提供維護此類系統所需的工具,
它是一個專注于實時處理的任務佇列,同時也支持任務調度,
Celery 擁有龐大而多樣化的用戶和貢獻者社區,您應該加入我們的 IRC 或我們的郵件串列,
Celery 是開源的,并在BSD 許可下獲得許可,

消費者與消費結果:
我們除了需要Celery做異步任務的處理,還需要一個中間件來充當消費者,并保存最終的任務處理結果(消費結果),這里有很多中間件可以選,例如常用的訊息中間件,rabbitmq,kafka等,還可以使用mysql,redis等作為消費者并保存消費結果(因為最終的處理結果要回傳給前端同事),樓主最終選擇了redis,
Redis安裝與配置:
這里不再贅述windows下安裝redis步驟,只介紹linux下安裝redis與配置,我的機器是centos7.6:
yum方式安裝(注意:這樣安裝的redis不是最新版本的,如有對版本要求比較高的,建議去官網下載原始碼包去手動安裝,官網地址:Redis,最新版本:6.2.6)
yum -y install redis
安裝完成之后配置redis.conf檔案:
vi /etc/redis.conf
修改這一行,改成 0.0.0.0,這樣別的應用和組件才可以訪問到redis的服務與埠:

同理,redis的默認埠也可以在此配置里修改:

還有一些關閉匿名訪問,設定密碼等配置的修改,專案若要上到公網環境下,建議配置,
啟動并測驗redis服務功能是否正常:
啟動redis:
redis-cli -h 0.0.0.0
測驗redis:
1 redis> set name "zzz"
2
3 OK
4
5 redis> get name
6
7 "zzz"
記住,代碼并沒有實際參考redis,但也需要安裝redis模塊,否則會報錯,(redis模塊版本不要太高,高了也會報錯,這些坑都是樓主親自趟過的,我這里使用2.10.6)
pip install redis==2.10.6
Celery的安裝和配置:
windos和linux下都可以使用pip安裝:
pip install celery==3.1.25
我的專案目錄:(celeryconfig.py與__init__.py檔案為celery與redis組態檔):

在專案中先創建一個名為config的python目錄,并在__init__.py中匯入celery模塊并配置:
__init__.py:
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True
app = Celery('prod') # 創建 Celery 實體
app.config_from_object('kernel.config.celeryconfig') # 通過 Celery 實體加載配置模塊
platforms.C_FORCE_ROOT = True 這個配置一定要有,否則會報權限問題,
在config目錄下的celeryconfig.py中配置任務佇列消費者與消費結果保存在redis的地址:
celeryconfig.py:
## celery配置
BROKER_URL = 'redis://redis-host:6379/1' # 指定 Broker消費者,我們使用redis 1號資料庫
CELERY_RESULT_BACKEND = 'redis://redis-host:6379/2' # 指定 Backend,最終消費結果,我們使用redis 2號資料庫
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時區,默認是 UTC
CELERY_IMPORTS = ( # 指定匯入的任務模塊
'kernel.views.api' ## 異步任務代碼檔案路徑即可
)
至此,前期需要的工具準備作業全部完畢,我們開始我們的開發任務,
異步任務開發:
樓主因為主要負責后端這塊,這里選擇使用flask來寫,整體的專案模塊與版本,大概羅列下:
Python 3.5.4
Mysql 5.5.64
Celery==3.1.25
Flask==1.1.4
Redis==2.10.6
這時我們與前端同事再次詳細溝通了下,初步約定如下:
1.前端通過form表單傳資料給后端,格式為json,分析:需要決議json資料,
2.因為存在長耗時的任務,要求一旦前端請求過來,后端要馬上回傳一個中間結果給前端(這樣解決了前端頁面等待的問題),分析:需要馬上提供一個回傳結果,
3.前端最終要拿到任務的最終執行結果,分析:我們需要把長耗時異步任務的最終結果推送給前端,需要任務代碼最后推送執行結果,(自己先定義回呼介面去測驗)
1.后端Flask介面代碼:
檔案名稱與路徑:
專案名稱-kernel-view-api.py,與celery配置下的任務模塊對應,

api.py:
# -*- coding: utf-8 -*-
import json, sys
import logging
import requests
import datetime,pymysql
import os,subprocess
from flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Response
from kernel.models.playbook import PlayBook_file
from kernel.utils import render_response, Retval
from kernel.models import db
from sqlalchemy import or_,text
import gitlab ## 匯入gitlab模塊
from kernel.config import app, cmdb_config,hcacp_config
import pymysql,uuid,hashlib,time
from datetime import timezone
bp = Blueprint('test', __name__) ## 藍圖自己定義,這里只是實體化
log = logging.getLogger(__name__) ## 日志自己定義,這里只是實體化
class status: ## 定義一些狀態碼
success = 0
warning = 1
pending = 2
faild = -1
## 回呼介面
@bp.route('/test/callback/', methods=['GET', 'POST'])
def ansible_aaa():
data1 = request.get_data(as_text=True)
# data2 = json.loads(data1)
log.info(data1)
return data1
@bp.route('/test/add/', methods=['POST', 'GET'])
def devops_add():
'''
獲取form表單json資料
'''
# return True
try:
data = request.get_data()
_data = json.loads((str(data, 'utf-8')))
print(_data)
except Exception as requestdata_except:
log.error('獲取表單資料例外,例外原因:%s' % requestdata_except)
return render_response(status.faild, u"獲取表單資料例外,例外原因:%s" % requestdata_except, {})
## 獲取標識tag的結果
try:
'''
工單json資料要帶工單識別符號select_tag:
create_project:新建專案申請工單
'''
select_tag = _data.get('select_tag')
except Exception as request_select_tag_except:
log.error('獲取表單需求標識select_tag例外,例外原因:%s' % request_select_tag_except)
return render_response(status.faild, u"獲取表單需求標識select_tag例外,例外原因:%s" % request_select_tag_except, {})
try:
"""
!--當引數select_tag == create_project 時,建立專案--!
"""
if select_tag == 'create_project':
projname = _data.get('projname')
add_project_result = add_project.delay(cmdb_config, _data)
return render_response(status.pending, u"devops系統添加專案工單任務執行中--pending--", {'專案中文名稱': projname})
except Exception as do_celery_job_except:
log.error('執行異步celery任務例外,例外原因:%s' % do_celery_job_except)
return render_response(status.faild, u"執行異步celery任務例外,例外原因:%s" % do_celery_job_except, {})
這里代表前端請求過來之后,馬上回傳一個執行結果,滿足需求2:

在devops_add介面里執行異步任務:
add_project_result = add_project.delay(cmdb_config, _data)
官網的示例:

## 1.擴號里為異步任務所需的引數
## 2.add_project_result 是異步任務執行的物件,包含很多屬性方法,下邊介紹一些常用的:
獲取任務結果和狀態:
add_project_result = task.apply_async()
add_project_result.ready() # 查看任務狀態,回傳布林值, 任務執行完成, 回傳 True, 否則回傳 False.
add_project_result.wait() # 會阻塞等待任務完成, 回傳任務執行結果,很少使用;
add_project_result.get(timeout=1) # 獲取任務執行結果,可以設定等待時間,如果超時但任務未完成回傳None;
add_project_result.result # 任務執行結果,未完成回傳None;
add_project_result.state # PENDING, START, SUCCESS,任務當前的狀態
add_project_result.status # PENDING, START, SUCCESS,任務當前的狀態
add_project_result.successful # 任務成功回傳true
add_project_result.traceback # 如果任務拋出了一個例外,可以獲取原始的回溯資訊
2.異步任務代碼:
檔案名稱與路徑:
專案名稱-kernel-view-api.py
api.py
解釋:
因為要滿足需求3,把最終異步耗時任務的真正結果給到前端,所以我們需要在異步任務里寫一個回呼的操作,
header = {'Content-Type': 'application/json'} ## 構造請求頭和資料型別
_json = {"status": sttaus.faild, "msg": u"失敗", "data": {}} ## 失敗就回傳給前端json型別失敗
_json = {"status": sttaus.success, "msg": u"成功", "data": {}} ## 成功就回傳給前端json型別成功
requests.post(callback_url, headers=header, data=json.dumps(_json)) ## 帶參回呼請求
# -*- coding: utf-8 -*-
import json, sys
import logging
import requests
import datetime,pymysql
import os,subprocess
from flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Response
from kernel.utils import render_response, Retval
from datetime import timezone
from kernel.config import * ## 匯入config目錄下的celery配置
bp = Blueprint('test', __name__) ## 藍圖自己定義,這里只是實體化
log = logging.getLogger(__name__) ## 日志自己定義,這里只是實體化
class status: ## 定義一些狀態碼
success = 0
warning = 1
pending = 2
faild = -1
## 示例函式:一個添加資訊函式,前端給我們json資料,后端接受之后去插入資料庫,完成操作并告訴前端
@app.task ## celery添加專案任務
def add_project(mysql_config, _data):
try:
## 系統添加專案資訊工單
projname = _data.get('projname') ## 專案名稱,必填
prodesc= _data.get('prodesc') ## 專案描述,必填
projctime = datetime.datetime.now() ## 專案發布時間
callback_url = _data.get('callback_url') ## 回呼介面地址
except Exception as describe_form_except:
log.error('決議表單資料出現例外,例外原因:%s' % describe_form_except)
header = {'Content-Type': 'application/json'} ## 回呼介面請求頭
_json = {"status": status.faild, "msg": u"失敗", "data": {}}
requests.post(callback_url, headers=header, data=json.dumps(_json))
try:
# 獲取資料庫連接
conn = pymysql.connect(cmdb_config.server, cmdb_config.user, cmdb_config.password, database=cmdb_config.db)
# 回傳連接
cursor = conn.cursor()
except Exception as connect_except:
log.error('系統資料庫連接出現例外,例外原因:%s' % connect_except)
_json = {"status": status.faild, "msg": u"失敗", "data": {}}
requests.post(callback_url, headers=header, data=json.dumps(_json))
try:
proj_sql = "insert into project_tb_project (projname,prodesc,projctime) VALUES ('{}','{}','{}');".format(projname, prodesc, projctime)
cursor.execute(proj_sql)
conn.commit()
_json = {"status": status.success, "msg": u"成功", "data": {}}
requests.post(callback_url, headers=header, data=json.dumps(_json))
## 任務執行完成之后呼叫回呼介面,回傳任務執行成功結果
log.info('系統建專案工單執行成功,%s' % proj_sql)
except Exception as do_add_project_except:
_json = {"status": status.faild, "msg": u"失敗", "data": {}}
requests.post(callback_url, headers=header, data=json.dumps(_json))
log.error('執行添加專案工單例外,例外原因:%s' % do_add_project_except)
## 任務執行完成之后呼叫回呼介面,回傳任務執行失敗結果
樓主用的最簡單,沒有在task里寫一些屬性,類似下邊的這種方式還可以給task添加一些屬性:
@app.task(name='test',bind=True,base=BaseTask)
補充介紹下異步task有的一些屬性:
TASK的一般屬性:
Task.name:任務名稱;
Task.request:當前任務的資訊;
Task.max_retries:設定重試的最大次數
Task.throws:預期錯誤類的可選元組,不應被視為實際錯誤,而是結果失敗;
Task.rate_limit:設定此任務型別的速率限制
Task.time_limit:此任務的硬限時(以秒為單位),
Task.ignore_result:不存盤任務狀態,默認False;
Task.store_errors_even_if_ignored:如果True,即使任務配置為忽略結果,也會存盤錯誤,
Task.serializer:標識要使用的默認序列化方法的字串,
Task.compression:標識要使用的默認壓縮方案的字串,默認為task_compression設定,
Task.backend:指定該任務的結果存盤后端用于此任務,
Task.acks_late:如果設定True為此任務的訊息將在任務執行后確認 ,而不是在執行任務之前(默認行為),即默認任務執行之前就會發送確認;
Task.track_started:如果True任務在作業人員執行任務時將其狀態報告為“已啟動”,默認是False;
我們啟動celery來看下celery里在執行任務的程序中有什么變化:
(1)啟動專案:
樓主用的是gunicorn工具啟動,配置多執行緒:
gunicorn.conf
workers = 16 ## 多執行緒配置
bind = '0.0.0.0:7777'
proc_name = 'websocket(專案名稱)'
limit_request_field_size = 0
limit_request_line = 0
log_level = 'error'
debug = True
chdir = '/data/websocket' ## 專案目錄
啟動命令:gunicorn -c /專案目錄/gunicorn.conf kernel:app
(2)啟動celery:
cd 到專案目錄下,執行 celery -A kernel.views.api worker -l info
(3)使用postman呼叫介面:
可以看到直接先回傳我們狀態碼2-等待狀態:

(4)從日志看異步任務執行程序:
1.會先在celery里出現一個異步任務,并生成一個異步任務的task-id號:

2.redis去查看是否已有task任務,task-id號是一致的:
用add_project_result保存異步任務執行結果的物件,最終的結果是在redis中,我們也可以去redis里去拿,redis保存的結果,
我們用的redis 2號資料庫,select 2 號資料庫,keys * 查看redis是否已有任務

任務最終的執行結果(celery日志里也可以看到,在redis里也可以看到,celery日志看的更直觀,succeded代表異步任務執行成功):

3. 查看專案日志,狀態碼為1,是回呼介面列印出來的,代表回傳給回呼介面最終結果是成功,
4.最終去資料庫看下新添加記錄是否已有,這里就不截圖了,記錄插入成功,異步任務執行成功,也滿足了開始我們溝通的三個需求,
5.前端同學給你豎起了大拇指,直呼你牛!

備注:
??????? ??????? ??????? ??????? ??????? ???????
celery還可以用來做定時任務,感興趣的伙伴們可以去官網或者其他途徑去研究下,樓主第一次寫這么大的博客,有些地方我描述不清楚的或者您沒太看懂的可以私信我答疑解惑,我的微信zcw576020095,熱愛python,熱愛運維,一起加油!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/355339.html
標籤:python
