Python 攜程,python攜程多核_python 協程

 2023-11-18 阅读 39 评论 0

摘要:最近對Python中的協程挺感興趣,這里記錄對協程的個人理解。要理解協程,首先需要知道生成器是什么。生成器其實就是不斷產出值的函數,只不過在函數中需要使用yield這一個關鍵詞將值產出。下面來看一個例子:我們調用gen()函數并不會直接執行該函數&

最近對Python中的協程挺感興趣,這里記錄對協程的個人理解。

要理解協程,首先需要知道生成器是什么。生成器其實就是不斷產出值的函數,只不過在函數中需要使用yield這一個關鍵詞將值產出。下面來看一個例子:

我們調用gen()函數并不會直接執行該函數,而是會得到一個生成器對象。對這個生成器對象調用next()函數,這個生成器對象會開始執行到第一個yield處,于是產出一個值0,注意:這時候gen()就暫停在yield處,直到第二次調用next()函數。

Python 攜程?到這里我們可以發現,生成器函數是可以暫停的函數,它在調用方的驅使下(調用方使用next()函數),每次執行到yield處將yield后方的值產出給調用方后就暫停自己的執行,直到調用方下一次驅動它執行。

send

我們知道,生成器函數可以不斷的產出值給調用方,那如果想要調用方傳遞值給生成器函數呢?這就自然而然的引入了send()函數。來看send()函數的使用:

執行上面的代碼,我們可以看到結果如下:

python攜程gevent,來看看上面代碼的執行:首先調用gen()得到一個生成器對象,這時候生成器函數還沒有開始執行,接著調用next()函數,生成器函數執行到第一個yield處,產出字符串hello后暫停執行,調用方得到產出的值打印輸出。然后調用方通過send()發送了一個字符串world給生成器函數,這時候,生成器函數將world賦值給s,繼續它的執行,直到第二個yield處,將調用方傳遞進來的world返回給調用方。

到這里我們可以發現,此時的生成器函數既可以暫停時產出值,又可以接收調用方傳遞進來的值恢復執行,這就和協程的思想差不多了。

yield from

python3.3中提出了這樣一個表達式yield from,我所知道的這個表達式有兩個用法:

協程 python。第一個用法是簡化for循環:

上面的寫法等同于:

這是yield from 的第一種用法,即后面跟一個可迭代的對象,yield from可以在調用方的驅使下將可迭代對象一個一個的輸出。

第二個用法是作為委派生成器使用:

python 4,在上面的代碼中,委派生成器使用了yield from,這就使得調用方在得到生成器對象時,可以通過send()方法和真正的生成器(這里為func)直接通信。于是我們在調用方中使用for循環了3次,每次傳遞進去的值都會傳遞給func函數中的s,當我們最后傳遞進一個None時,真正的生成器跳出for循環并將n的值返回,這時候委派生成器得到func生成器的返回值并將它賦給result。這樣就完成了調用方和真正的生成器函數之間的通信,并且真正的生成器在執行結束之后會將結果返回給委派生成器。

我們可以看到,使用了yield from的委派生成器其實就是為調用方和真正的生成器提供了一個通道,這個通道可以讓它們直接通信。

Event Loop

在真正的理解協程之前,還有個東西時我覺得必須要理解的,那就是事件循環(Event Loop)。

python3?協程是單線程的,單線程就意味著所有的任務需要在單線程上排隊執行,也就是前一個任務沒有執行完成,后一個任務就沒有辦法執行。在CPU密集型的任務之中,這樣其實還行,但是如果我們的任務都是IO密集型的呢?也就是我們大部分的任務都是在等待網絡的數據返回,等待磁盤文件的數據,這就會造成CPU一直在等待這些任務的完成再去執行下一個任務。

有沒有什么辦法能夠讓單線程的任務執行不這么笨呢?其實我們可以將這些需要等待IO設備的任務掛在一邊嘛!這時候,如果我們的任務都是需要等待的任務,那么單線程在執行時遇到一個就把它掛起來,這里可以通過一個數據結構(例如隊列)將這些處于執行等待狀態的任務放進去,為什么是執行等待狀態呢?因為它們正在執行但是又不得不等待例如網絡數據的返回等等。直到將所有的任務都放進去之后,單線程就可以開始它的接連不斷的表演了:有沒有任務完成的小伙伴呀!快來我這里執行!

此時如果有某個任務完成了,它會得到結果,于是發出一個信號:我完成了。那邊還在循環追問的單線程終于得到了答復,就會去看看這個任務有沒有綁定什么回調函數呀?如果綁定了回調函數就進去把回調函數給執行了,如果沒有,就將它所在的任務恢復執行,并將結果返回。

到這里事件循環的大致作用已經說完了,我們可以看到,僅僅有協程是不夠的,我們還需要事件循環和它配合使用,這樣才能讓多個協程可以并發的執行。

python async,Python3.4中,引入了asyncio包,這個包提供了關于事件循環的實現,這就使得在Python中使用協程實現高并發成為可能。我們來模擬一個爬蟲:

在上面的模擬爬蟲的代碼中,我們使用了裝飾器@asyncio.coroutine來將這個get_html()函數定義為協程,在協程中使用了asyncio.sleep()函數模擬從網絡請求數據。在執行的過程中,我們首先使用asyncio提供的get_event_loop()創建一個事件循環,這里我們不需要自己實現事件循環,接著創建兩個協程,并將這兩個協程扔到事件循環中執行。

運行上面代碼,可以看到以下結果:

仔細觀察,我們會發現在協程中并沒有使用time.sleep()函數,而是使用了asyncio.sleep()函數,是因為time.sleep()函數會將整個線程休眠幾秒,而asyncio.sleep()其實也是一個協程,這個協程將和事件循環直接通信并將一個Future對象交給事件循環,事件循環會一直監視著它直到它的任務完成(在這里就是休眠兩秒),并不會將整個線程都停止執行。

python asyncio、到現在,我們可以使用基于生成器的協程和事件循環來做到高并發了。但是問題來了,這里是基于生成器的協程,生成器其實有自己的用法,為什么還要給它強加一個協程的用法呢?

async/await

Python3.5中引入了async/await這一組關鍵詞,這就使得python可以定義原生協程了。await的用法和yield from用法類似,但是await后面只能跟Awaitable的對象(實現了__await__魔法方法),而yield from后面可以跟生成器、協程等等。

使用async/await修改上面的代碼:

python多核多線程編程?在Tornado的官方文檔中,其實是建議用戶使用async/await來定義原生協程,原因有以下幾點:

1.原生協程要快于基于生成器的協程

2.原生協程可以使用async for和async with語法

參考文章

python scrapy。協程的優點:

1.協程是進程和線程的升級版,進程和線程都面臨著內核態和用戶態的切換問題而耗費許多切換時間,

而協程就是用戶自己控制切換的時機,不再需要陷入系統的內核態。

2.協程的執行效率非常高。因為子程序切換不是線程切換,而是由程序自身控制。因此,沒有線程切換的開銷,和多線程相比,線程數量越多,相同數量的協程體現出的優勢越明顯

3.不需要多線程的鎖機制。由于只有一個線程,也不存在同時寫變量的沖突,在協程中控制共享資源不需要加鎖,只需要判斷數據的狀態,所以執行效率遠高于線程 ,對于多核CPU可以使用多進程+協程來盡可能高效率地利用CPU。

生產者消費者模型通過協程的yield思想實現:

importtimedefconsumer():

r= ''

whileTrue:

n= yieldrif notn:return

print('[消費者]

time.sleep(1)

r= 'ok'

defproducer(c):

next(c)

n=0while n < 5:

n= n + 1

print('[生產者] ---> Producing %s...' %n)#和next有同樣的同能,可以觸發函數到下一個yield,

#區別于next的是可以像yield的左邊的變量傳值,例如上面yield左邊的n

c_ret =c.send(n)print('[生產者] Consumer return %s' %c_ret)

c.close()if __name__ == '__main__':

c=consumer()

producer(c)

結果如下:

[生產者] ---> Producing 1...

[消費者]

[生產者] Consumer return ok

[生產者] ---> Producing 2...

[消費者]

[生產者] Consumer return ok

[生產者] ---> Producing 3...

[消費者]

[生產者] Consumer return ok

[生產者] ---> Producing 4...

[消費者]

[生產者] Consumer return ok

[生產者] ---> Producing 5...

[消費者]

[生產者] Consumer return ok

可以看出通過協程協程實現的生產者消費者模型是一種可控的生產消費模型,在消費者producer調用send之后啟動生產者,實現可控的生產者消費者之間的通信。

使用yield手動實現協程是比較麻煩的,Python提供了greenlet和gevent模塊用來實現協程。

from gevent import monkey;monkey.patch_all()

from gevent.queue import Queue #隊列 gevent中的隊列

import gevent

import random

#這個猴子補丁,all是所有能切換協程的地方都切換,包含了socket,所以一般都用all

qq = Queue(3)

def produceer():

while True:

item = random.randint(0,99)

qq.put(item)

print("生產了:", item)

def consumer():

while True:

item = qq.get()

print("消費了:",item)

p = gevent.spawn(produceer)

c = gevent.spawn(consumer)

gevent.joinall([p,c])

引言

目前很多公司選擇將python項目使用golang重構,很大一方面原因是因為golang的并發能力,golang自帶的語法糖支持使并發編程變的相對簡單,也更能充分的使用多核CPU的計算資源。

相應的,python長期受制于GIL,無法在多線程時使用多核CPU,所以一直以來在談及python的缺陷時,性能總是無法回避的一個問題。當然,一些python著名的第三方組織也一直通過各種手段來改善python的并發性能,如twisted的異步模型使用事件驅動機制來提升python性能,著名的爬蟲框架scrapy便是以twisted作為底層網絡庫來開發的,還有gevent,它使用greenlet在用戶態完成棧和上下文切換來減少切換帶來的性能損耗,同樣還有著名的web協程框架tornado,他使用生成器來保存協程上下文及狀態,使用原生的python語法實現了協程。但從python3.4開始python引入asyncio標準庫,隨后又在3.5引入async/await關鍵字,從根本上規范了python異步編程標準,使python異步編程逐漸流行起來。

關于什么是python協程,相信網上已經有了不少資料,但是只描述抽象的上層建筑難免會讓人接受困難,本文希望可以通過從最簡單的代碼和邏輯,使用最基礎的數據結構,從實現出發,帶領大家理解什么是python協程。

首先需要補充一些基礎知識

什么是生成器

我們都應該聽說過迭代器,這在很多語言中都有類似的概念,簡單的說,迭代器就是可以被迭代的對象,對其使用next操作可以返回一個元素,通常多次迭代后迭代器會中止,此時迭代器無法再使用。比如python中可以通過iter方法來將一個列表轉換成迭代器:

I

n [1]: lst = [1, 2, 3]

In [2]: iterator =iter(lst)

In [3]: next(iterator)

Out[3]: 1In [4]: next(iterator)

Out[4]: 2In [5]: next(iterator)

Out[5]: 3In [6]: next(iterator)---------------------------------------------------------------------------StopIteration Traceback (most recent call last) in ()----> 1next(iterator)

StopIteration:

進python群:835017344,獲取python學習資料

生成器可以看作是迭代器的子類,同時提供了比迭代器更強大的功能,python中,可以使用yield關鍵字使函數返回生成器對象。

In [8]: deffun():

...:yield 1...:yield 2...:yield 3...:

In [9]: iterator =fun()

In [10]: next(iterator)

Out[10]: 1In [11]: next(iterator)

Out[11]: 2In [12]: next(iterator)

Out[12]: 3In [13]: next(iterator)---------------------------------------------------------------------------StopIteration Traceback (most recent call last) in ()----> 1next(iterator)

StopIteration:

每次next調用, fun函數只執行四分之一,如果我們擁有多個生成器對象,?按照一定規則?可控的對他分別調用next,生成器每次的暫停都保存了執行進度和內部狀態。如果將這三個生成器理解成協程,那不正是我們熟悉的協程間的切換?

事件循環

所以,我們可以想象,現在有一個循環和一個生成器列表,每次循環,我們都將所有的生成器進行一次調用,所有生成器交替執行。如下:

In [16]: gen_list =[fun(), fun(), fun()]

In [17]: whileTrue:

...:for gen ingen_list:

...:print(next(gen))

...:1

1

1

2

2

2

3

3

3

---------------------------------------------------------------------------StopIteration Traceback (most recent call last) in ()1 whileTrue:2 for gen ingen_list:----> 3 print(next(gen))4StopIteration:

當然,我們還可以換一種寫法,將生成器的每一步都當成是一次調用,把生成器包裝成一個Handle對象,每次調用handle對象的call來完成生成器的調用,同時,我們還可以在調用完成后做一些準備來控制下一次調用的時間,將Handle對應放到一個scheduled_list里面:deffun():print("step1")yield

print("step2")yield

print("step3")yieldscheduled_list=[]classHandle(object):def __init__(self, gen):

self.gen=gendefcall(self):

next(self.gen)

scheduled_list.append(self)def loop(*coroutines):

scheduled_list.extend(Handle(c)for c incoroutines)whileTrue:whilescheduled_list:

handle=scheduled_list.pop(0)

handle.call()if __name__ == "__main__":

loop(fun(), fun(), fun())

協程中的阻塞

在有了以上的基礎后,我們來分析上面提到的切換規則,什么時候應該切換協程(生成器)?顯而易見,當遇到阻塞時,我們才需要切換協程,以避免CPU的浪費。我將阻塞分為了以下三種:

IO調用,如socket,file,pipe等。

人為制造的阻塞,如sleep。

異步調用。

假設,在我們的生成器內有一次socket調用,我們不知道它多久會ready,我們希望不等待它的返回,切換到其它協程運行,等其準備好之后再切換回來,該怎么辦?

有同學可能會想到了,將socket注冊到epoll上。如下:importtimeimportsocketfrom functools importpartialfrom select importepoll

poll=epoll()

handlers=dict()

scheduled_list=[]deffun():print("step1")

sock=socket.socket()

future=Future()defhandler():

future.set_done(sock.recv(1024))

add_handler(sock.fileno(), handler, READ)yieldfutureprint("step2")yield

print("step3")yield

defadd_handler(fd, handler, events):

handlers[fd]=handler

poll.register(fd, events)classFuture(object):def __init__(self):

self.callbacks=[]defadd_callback(self, callback):

self.callbacks.append(callback)defset_done(self, value):

self.value=valuefor callback inself.callbacks:

callback()defget_result(self):returnself.valueclassHandle(object):def __init__(self, gen):

self.gen=gendefcall(self):

yielded=next(self.gen)ifisinstance(yielded, Future):

yielded.add_callback(partial(scheduled_list.append, self))else:

scheduled_list.append(self)def loop(*coroutines):

scheduled_list.extend(Handle(c)for c incoroutines)whileTrue:

default_timeout= 10000

whilescheduled_list:

handle=scheduled_list.pop(0)

handle.call()#等待描述符可操作

events =poll.poll(default_timeout)whileevents:

fd, event=events.popitem()

handlers[fd]()

poll.unregister(fd)delhandlers[fd]if __name__ == "__main__":

loop(fun(), fun(), fun())

這一步引入一個新的對象Future,他用來代指未來即將發生的調用,通過epoll上注冊的事件,觸發了它的完成,完成之后執行了將handle對象放回scheduled_list, 可從而切回了協程。

那么,人為制造的阻塞我們怎么切換協程呢?這里,我們又引入了一個新的對象Timeoutimporttimeimportsocketfrom functools importpartialfrom select importepoll

poll=epoll()

handlers=dict()

scheduled_list=[]#創建一個timeout_list

timeout_list =[]deffun():print("step1")

sock=socket.socket()

future=Future()defhandler():

future.set_done()

add_handler(sock.fileno(), handler, READ)yieldfutureprint("step2")yield sleep(3)print("step3")yield

defadd_handler(fd, handler, events):

handlers[fd]=handler

poll.register(fd, events)defsleep(sec):

future=Future()

timeout=Timeout(sec, future.set_done)

timeout_list.append(timeout)returnfutureclassTimeout(object):def __init__(self, timeout, callback):

self.deadline= time.time() +timeout

self.callback=callbackdefcall(self):

self.callback(None)classFuture(object):def __init__(self):

self.callbacks=[]

self.value=Nonedefadd_callback(self, callback):

self.callbacks.append(callback)defset_done(self, value):

self.value=valuefor callback inself.callbacks:

callback()defget_result(self):returnself.valueclassHandle(object):def __init__(self, gen):

self.gen=gendefcall(self):

yielded=next(self.gen)ifisinstance(yielded, Future):

yielded.add_callback(partial(scheduled_list.append, self))else:

scheduled_list.append(self)def loop(*coroutines):

scheduled_list.extend(Handle(c)for c incoroutines)whileTrue:

default_timeout= 10000deadline=time.time()for timeout intimeout_list[:]:if timeout.deadline <=deadline:

timeout.call()

timeout_list.remove(timeout)whilescheduled_list:

handle=scheduled_list.pop(0)

handle.call()for timeout intimeout_list:

wait_time= timeout.deadline -deadlineif wait_time <=0:

wait_time=0

default_timeout=min(default_timeout, wait_time)if not scheduled_list and not timeout_list and nothandlers:break

#等待描述符可操作

events =poll.poll(default_timeout)whileevents:

fd, event=events.popitem()

handlers[fd]()

poll.unregister(fd)delhandlers[fd]if __name__ == "__main__":

loop(fun(), fun(), fun())

通過創建一個Timeout對象,我們在deadline時觸發了其回調,使Future完成,從而完成了協程的切換。

由以上兩點,我們可以大致觀察出一個規律,創建Future對象,切出協程,在合適的時機(如socket ready或到達deadline/timeout)讓他完成,切入協程,這才是協程切換的關鍵所在,由此,我們可以使用Future來管理各種異步調用。

如,我們在python編碼時遇到了一個計算密集型的函數,由于python單進程無法利用多核,我們可以創建一個子進程來處理計算,同時關聯到一個Future中:deffun():print("step1")

sock=socket.socket()

future=Future()defhandler():

future.set_done()

add_handler(sock.fileno(), handler, READ)yieldfutureprint("step2")yield sleep(3)print("step3")

future=Future()from multiprocessing importProcess

Process(target=long_time_call, args=(future, )).start()yieldfuturedeflong_time_call(future):#...

future.set_done()

當協程執行到第三步時,遇到了長時間運行的函數調用,我們創建了一個Future,關聯到一個子進程中,并在子進程完成時設置future完成,在子進程完成之前,父進程已完成協程的切出,將執行權交給其它協程執行。

這個地方遺漏了一個細節,當沒有其它協程可以執行時,epoll會被設置成超時時間=10000,因而陷入到長時間的睡眠中,而子進程完成后需要切入協程,但父進程已經被epoll阻塞掉,如何喚醒主進程繼續執行該協程呢?業界通用的做法是,創建一個管道,在切出協程時讓epoll監聽讀fd,子進程完成后,往管道中寫入一個字符,epoll監聽的讀fd 馬上變成ready,因此epoll解除阻塞,事件循環得以繼續執行。

當然,異步調用不僅僅可以使用子進程,子線程、遠程計算框架都可以通過這種方式執行。

講到這里,大家應該基本明白了一個協程函數是如何工作的了。下表可幫助我們從線程的角度理解協程

上面的表格表述線程和協程的一一對應關系,最后一欄可能還需要舉例解釋一下:

我們知道一個線程執行過程中,會嵌套調用多個函數,如:deffoo():print("in foo")defbar():print("in bar")deffun():

bar()

foo()if __name__ == "__main__":

fun()

那么生成器如何嵌套調用呢?python3.4之前,嵌套的生成器只能這么使用:deffoo():print("in foo")yield

defbar():print("in bar")yield

deffun():for i inbar():yieldifor i infoo():yieldiif __name__ == "__main__":for i infun():passpython3.4之后引入了新的語法糖yieldfrom,簡化了調用方式:deffoo():print("in foo")yield

defbar():print("in bar")yield

deffun():yield frombar()yield fromfoo()if __name__ == "__main__":for i infun():pass

yield from可以驅動子生成器,來逐一返回子生成器中的值,將嵌套的生成器打平。值得一提的是,yieldfrom才是await的真實身份。

讓我們用最初的例子來編寫一個嵌套了的子生成器函數的協程demo。我們將fun生成器抽離成2種阻塞操作,并封裝好:defread(sock):

future=Future()defhandler():

buf= sock.recv(1024)

future.set_done(buf)

add_handler(sock.fileno(), handler,0x001)yieldfuturereturnfuture.get_result()defsleep(sec):

future=Future()

timeout=Timeout(sec, future.set_done)

timeout_list.append(timeout)yieldfuture

有了這兩個基礎函數之后,我們就可以自由的編寫我們協程了defcoroutine(num):

client=socket.socket()

client.connect(("", 1234))print(f"coroutine_{num} start")

buffer= yield fromread(client)print(f"coroutine_{num} recv:", buffer)yield from sleep(3)print(f"coroutine_{num} wake up")

client.close()if __name__ == "__main__":

loop(coroutine(1), coroutine(2))

我們創建了兩個協程,其中調用了一次socket讀和一個睡眠,讓我們看一下執行效果:

coroutine_1 start

coroutine_2 start

coroutine_2 recv: b'test'coroutine_1 recv: b'test'coroutine_2 wake up

coroutine_1 wake up

兩個協程異步交替執行。

asyncio的使用

相信看完上面的例子之后,大家應該對python協程的實現有了初步的認識,那標準的python協程如何使用呢?importsocketimportasyncio

asyncdefread(sock):

loop=asyncio.get_event_loop()

future=loop.create_future()defhandler():

buf= sock.recv(1024)

future.set_result(buf)

loop.remove_reader(sock.fileno())

loop.add_reader(sock.fileno(), handler)

await futurereturnfuture.result()

asyncdefcoroutine(num):

client=socket.socket()

client.connect(("", 1234))print(f"coroutine_{num} start")

buffer=await read(client)print(f"coroutine_{num} recv:", buffer)

await asyncio.sleep(3)print(f"coroutine_{num} wake up")

client.close()if __name__ == "__main__":

loop=asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(coroutine(1), coroutine(2)))

幾乎和我們的實現一模一樣,其中await取代了yieldfrom, 協程顯式使用async來聲明。

python協程的應用

python協程優勢

通過上述例子我們可以很容易看出,python協程具有以下特點:

超級輕量,不需要維護協程棧,所有的上下文執行狀態都被維護在了生成器中。

切換自由,通過yieldfrom(python3.5以后是await)隨意切換協程,協程切換完全可控以至于幾乎不用加鎖。

并發能力強,并發上限理論上取決于IO多路復用可注冊的文件描述符的極限。

缺點

還是只能用到單核。

由于協程切換是非搶占式的,所以如果業務是CPU密集型的,可能其它協程長時間得不到執行。

綜上所述,在使用python的高并發場景下,python多進程+協程是最優的解決方案

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/177880.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息