當前位置:編程學習大全網 - 源碼下載 - python2.7怎麽實現異步

python2.7怎麽實現異步

改進之前

之前,我的查詢步驟很簡單,就是:

前端提交查詢請求 --> 建立數據庫連接 --> 新建遊標 --> 執行命令 --> 接受結果 --> 關閉遊標、連接

這幾大步驟的順序執行。

這裏面當然問題很大:

建立數據庫連接實際上就是新建壹個套接字。這是進程間通信的幾種方法裏,開銷最大的了。

在“執行命令”和“接受結果”兩個步驟中,線程在阻塞在數據庫內部的運行過程中,數據庫連接和遊標都處於閑置狀態。

這樣壹來,每壹次查詢都要順序的新建數據庫連接,都要阻塞在數據庫返回結果的過程中。當前端提交大量查詢請求時,查詢效率肯定是很低的。

第壹次改進

之前的模塊裏,問題最大的就是第壹步——建立數據庫連接套接字了。如果能夠壹次性建立連接,之後查詢能夠反復服用這個連接就好了。

所以,首先應該把數據庫查詢模塊作為壹個單獨的守護進程去執行,而前端app作為主進程響應用戶的點擊操作。那麽兩條進程怎麽傳遞消息呢?翻了幾天Python文檔,終於構思出來:用隊列queue作為生產者(web前端)向消費者(數據庫後端)傳遞任務的渠道。生產者,會與SQL命令壹起,同時傳遞壹個管道pipe的連接對象,作為任務完成後,回傳結果的渠道。確保,任務的接收方與發送方保持壹致。

作為第二個問題的解決方法,可以使用線程池來並發獲取任務隊列中的task,然後執行命令並回傳結果。

第二次改進

第壹次改進的效果還是很明顯的,不用任何測試手段。直接點擊頁面鏈接,可以很直觀地感覺到反應速度有很明顯的加快。

但是對於第二個問題,使用線程池還是有些欠妥當。因為,CPython解釋器存在GIL問題,所有線程實際上都在壹個解釋器進程裏調度。線程稍微開多壹點,解釋器進程就會頻繁的切換線程,而線程切換的開銷也不小。線程多壹點,甚至會出現“抖動”問題(也就是剛剛喚醒壹個線程,就進入掛起狀態,剛剛換到棧幀或內存的上下文,又被換回內存或者磁盤),效率大大降低。也就是說,線程池的並發量很有限。

試過了多進程、多線程,只能在單個線程裏做文章了。

Python中的asyncio庫

Python裏有大量的協程庫可以實現單線程內的並發操作,比如Twisted、Gevent等等。Python官方在3.5版本裏提供了asyncio庫同樣可以實現協程並發。asyncio庫大大降低了Python中協程的實現難度,就像定義普通函數那樣就可以了,只是要在def前面多加壹個async關鍵詞。async def函數中,需要阻塞在其他async def函數的位置前面可以加上await關鍵詞。

import asyncio

async def wait():

await asyncio.sleep(2)

async def execute(task):

process_task(task)

await wait()

continue_job()

async def函數的執行稍微麻煩點。需要首先獲取壹個loop對象,然後由這個對象代為執行async def函數。

loop = asyncio.get_event_loop()

loop.run_until_complete(execute(task))

loop.close()

loop在執行execute(task)函數時,如果遇到await關鍵字,就會暫時掛起當前協程,轉而去執行其他阻塞在await關鍵詞的協程,從而實現協程並發。

不過需要註意的是,run_until_complete()函數本身是壹個阻塞函數。也就是說,當前線程會等候壹個run_until_complete()函數執行完畢之後,才會繼續執行下壹部函數。所以下面這段代碼並不能並發執行。

for task in task_list:

loop.run_until_complete(task)

對與這個問題,asyncio庫也有相應的解決方案:gather函數。

loop = asyncio.get_event_loop()

tasks = [asyncio.ensure_future(execute(task))

for task in task_list]

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

當然了,async def函數的執行並不只有這兩種解決方案,還有call_soon與run_forever的配合執行等等,更多內容還請參考官方文檔。

Python下的I/O多路復用

協程,實際上,也存在上下文切換,只不過開銷很輕微。而I/O多路復用則完全不存在這個問題。

目前,Linux上比較火的I/O多路復用API要算epoll了。Tornado,就是通過調用C語言封裝的epoll庫,成功解決了C10K問題(當然還有Pypy的功勞)。

在Linux裏查文檔,可以看到epoll只有三類函數,調用起來比較方便易懂。

創建epoll對象,並返回其對應的文件描述符(file descriptor)。

int epoll_create(int size);

int epoll_create1(int flags);

控制監聽事件。第壹個參數epfd就對應於前面命令創建的epoll對象的文件描述符;第二個參數表示該命令要執行的動作:監聽事件的新增、修改或者刪除;第三個參數,是要監聽的文件對應的描述符;第四個,代表要監聽的事件。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

等候。這是壹個阻塞函數,調用者會等候內核通知所註冊的事件被觸發。

int epoll_wait(int epfd, struct epoll_event *events,

int maxevents, int timeout);

int epoll_pwait(int epfd, struct epoll_event *events,

int maxevents, int timeout,

const sigset_t *sigmask);

在Python的select庫裏:

select.epoll()對應於第壹類創建函數;

epoll.register(),epoll.unregister(),epoll.modify()均是對控制函數epoll_ctl的封裝;

epoll.poll()則是對等候函數epoll_wait的封裝。

Python裏epoll相關API的最大問題應該是在epoll.poll()。相比於其所封裝的epoll_wait,用戶無法手動指定要等候的事件,也就是後者的第二個參數struct epoll_event *events。沒法實現精確控制。因此只能使用替代方案:select.select()函數。

根據Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對Unix系統中select函數的直接調用,與C語言API的傳參很接近。前三個參數都是列表,其中的元素都是要註冊到內核的文件描述符。如果想用自定義類,就要確保實現了fileno()方法。

其分別對應於:

rlist: 等候直到可讀

wlist: 等候直到可寫

xlist: 等候直到異常。這個異常的定義,要查看系統文檔。

select.select(),類似於epoll.poll(),先註冊文件和事件,然後保持等候內核通知,是阻塞函數。

實際應用

Psycopg2庫支持對異步和協程,但和壹般情況下的用法略有區別。普通數據庫連接支持不同線程中的不同遊標並發查詢;而異步連接則不支持不同遊標的同時查詢。所以異步連接的不同遊標之間必須使用I/O復用方法來協調調度。

所以,我的大致實現思路是這樣的:首先並發執行大量協程,從任務隊列中提取任務,再向連接池請求連接,創建遊標,然後執行命令,並返回結果。在獲取遊標和接受查詢結果之前,均要阻塞等候內核通知連接可用。

其中,連接池返回連接時,會根據引用連接的協程數量,返回負載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。

我的代碼位於:bottle-blog/dbservice.py

存在問題

當然了,這個流程目前還壹些問題。

首先就是每次輪詢拿到任務之後,都會走這麽壹個流程。

獲取連接 --> 新建遊標 --> 執行任務 --> 關閉遊標 --> 取消連接引用

本來,最好的情況應該是:在輪詢之前,就建好遊標;在輪詢時,直接等候內核通知,執行相應任務。這樣可以減少輪詢時的任務量。但是如果協程提前對應好連接,那就不能保證在獲取任務時,保持各連接負載均衡了。

所以這壹塊,還有工作要做。

還有就是epoll沒能用上,有些遺憾。

以後打算寫點C語言的內容,或者用Python/C API,或者用Ctypes包裝***享庫,來實現epoll的調用。

最後,請允許我吐槽壹下Python的epoll相關文檔:簡直太弱了!!!必須看源碼才能弄清楚功能。

  • 上一篇:win10系統便簽功能打不開怎麽辦
  • 下一篇:win10打不開便簽功能的解決方法
  • copyright 2024編程學習大全網