快速掌握用python寫并行程序

小子今天想來談談“并行計算”,作為一個非科班人員,我為什么去搗鼓這么一個在科班里也比較專業的問題了。這就要說下我前幾天做的一個作業了,當時我用python寫了個程序,結果運行了一天,這個速度可讓我愁了,我還怎么優化,怎么交作業啊。于是小子就去各大論壇尋丹問藥了,終于讓我發現可以用并行計算來最大化壓榨電腦的CPU,提升計算效率,而且python里有multiprocessing這個庫可以提供并行計算接口,于是小子花1天時間改進程序,終于在規定時間內做出了自己滿意的結果,上交了作業。之后,小子對并行計算充滿了興趣,于是又重新在Google上游歷了一番,大致弄清了GPU、CPU、進程、線程、并行計算、分布式計算等概念,也把python的multiprocessing耍了一遍,現在小子也算略有心得了,所以來此立碑,以示后來游客。
小子本文分為四部分,一是大數據時代現狀,其二是面對挑戰的方法,然后是用python寫并行程序,最后是multiprocessing實戰。

一、大數據時代的現狀

當前我們正處于大數據時代,每天我們會通過手機、電腦等設備不斷的將自己的數據傳到互聯網上。據統計,YouTube上每分鐘就會增加500多小時的視頻,面對如此海量的數據,如何高效的存儲與處理它們就成了當前最大的挑戰。
但在這個對硬件要求越來越高的時代,CPU卻似乎并不這么給力了。自2013年以來,處理器頻率的增長速度逐漸放緩了,目前CPU的頻率主要分布在3~4GHz。這個也是可以理解的,畢竟摩爾定律都生效了50年了,如果它老人家還如此給力,那我們以后就只要靜等處理器頻率提升,什么計算問題在未來那都不是話下了。實際上CPU與頻率是于能耗密切相關的,我們之前可以通過加電壓來提升頻率,但當能耗太大,散熱問題就無法解決了,所以頻率就逐漸穩定下來了,而Intel與AMD等大制造商也將目標轉向了多核芯片,目前普通桌面PC也達到了4~8核。

二、面對挑戰的方法

咱們有了多核CPU,以及大量計算設備,那我們怎么來用它們應對大數據時代的挑戰了。那就要提到下面的方法了。

2.1 并行計算

并行(parallelism)是指程序運行時的狀態,如果在同時刻有多個“工作單位”運行,則所運行的程序處于并行狀態。圖一是并行程序的示例,開始并行后,程序從主線程分出許多小的線程并同步執行,此時每個線程在各個獨立的CPU進行運行,在所有線程都運行完成之后,它們會重新合并為主線程,而運行結果也會進行合并,并交給主線程繼續處理。

多線程并行(OpenMap)

圖一、多線程并行

圖二是一個多線程的任務(沿線為線程時間),但它不是并行任務。這是因為task1與task2總是不在同一時刻執行,這個情況下單核CPU完全可以同時執行task1與task2。方法是在task1不執行的時候立即將CPU資源給task2用,task2空閑的時候CPU給task1用,這樣通過時間窗調整任務,即可實現多線程程序,但task1與task2并沒有同時執行過,所以不能稱為并行。我們可以稱它為并發(concurrency)程序,這個程序一定意義上提升了單個CPU的使用率,所以效率也相對較高。
并發程序

圖二、多線程并發

并行編程模型:

  • 數據并行(Data Parallel)模型:將相同的操作同時作用于不同數據,只需要簡單地指明執行什么并行操作以及并行操作對象。該模型反映在圖一中即是,并行同時在主線程中拿取數據進行處理,并線程執行相同的操作,然后計算完成后合并結果。各個并行線程在執行時互不干擾。
  • 消息傳遞(Message Passing)模型:各個并行執行部分之間傳遞消息,相互通訊。消息傳遞模型的并行線程在執行時會傳遞數據,可能一個線程運行到一半的時候,它所占用的數據或處理結果就要交給另一個線程處理,這樣,在設計并行程序時會給我們帶來一定麻煩。該模型一般是分布式內存并行計算機所采用方法,但是也可以適用于共享式內存的并行計算機。

什么時候用并行計算:

  1. 多核CPU——計算密集型任務。盡量使用并行計算,可以提高任務執行效率。計算密集型任務會持續地將CPU占滿,此時有越多CPU來分擔任務,計算速度就會越快,這種情況才是并行程序的用武之地。
  2. 單核CPU——計算密集型任務。此時的任務已經把CPU資源100%消耗了,就沒必要使用并行計算,畢竟硬件障礙擺在那里。
  3. 單核CPU——I/O密集型任務。I/O密集型任務在任務執行時需要經常調用磁盤、屏幕、鍵盤等外設,由于調用外設時CPU會空閑,所以CPU的利用率并不高,此時使用多線程程序,只是便于人機交互。計算效率提升不大。
  4. 多核CPU——I/O密集型任務。同單核CPU——I/O密集型任務。

2.2 改用GPU處理計算密集型程序

GPU即圖形處理器核心(Graphics Processing Unit),它是顯卡的心臟,顯卡上還有顯存,GPU與顯存類似與CPU與內存。
GPU與CPU有不同的設計目標,CPU需要處理所有的計算指令,所以它的單元設計得相當復雜;而GPU主要為了圖形“渲染”而設計,渲染即進行數據的列處理,所以GPU天生就會為了更快速地執行復雜算術運算和幾何運算的。
GPU相比與CPU有如下優勢:

  1. 強大的浮點數計算速度。
  2. 大量的計算核心,可以進行大型并行計算。一個普通的GPU也有數千個計算核心。
  3. 強大的數據吞吐量,GPU的吞吐量是CPU的數十倍,這意味著GPU有適合的處理大數據。

GPU目前在處理深度學習上用得十分多,英偉達(NVIDIA)目前也花大精力去開發適合深度學習的GPU。現在上百層的神經網絡已經很常見了,面對如此龐大的計算量,CPU可能需要運算幾天,而GPU卻可以在幾小時內算完,這個差距已經足夠別人比我們多打幾個比賽,多發幾篇論文了。

3.3 分布式計算

說到分布式計算,我們就先說下下Google的3篇論文,原文可以直接點鏈接去下載:

Google在2003~2006年發表了這三篇論文之后,一時之間引起了轟動,但是Google并沒有將MapReduce開源。在這種情況下Hadoop就出現了,Doug Cutting在Google的3篇論文的理論基礎上開發了Hadoop,此后Hadoop不斷走向成熟,目前Facebook、IBM、ImageShack等知名公司都在使用Hadoop運行他們的程序。

分布式計算的優勢:
可以集成諸多低配的計算機(成千上萬臺)進行高并發的儲存與計算,從而達到與超級計算機媲美的處理能力。

三、用python寫并行程序

在介紹如何使用python寫并行程序之前,我們需要先補充幾個概念,分別是進程、線程與全局解釋器鎖(Global Interpreter Lock, GIL)。

3.1 進程與線程

進程(process):

  • 在面向線程設計的系統(如當代多數操作系統、Linux?2.6及更新的版本)中,進程本身不是基本運行單位,而是線程的容器。
  • 進程擁有自己獨立的內存空間,所屬線程可以訪問進程的空間。
  • 程序本身只是指令、數據及其組織形式的描述,進程才是程序的真正運行實例。 例如,Visual Studio開發環境就是利用一個進程編輯源文件,并利用另一個進程完成編譯工作的應用程序。

線程(threading):

  • 線程有自己的一組CPU指令、寄存器與私有數據區,線程的數據可以與同一進程的線程共享。
  • 當前的操作系統是面向線程的,即以線程為基本運行單位,并按線程分配CPU。

進程與線程有兩個主要的不同點,其一是進程包含線程,線程使用進程的內存空間,當然線程也有自己的私有空間,但容量小;其二是進程有各自獨立的內存空間,互不干擾,而線程是共享內存空間。
圖三展示了進程、線程與CPU之間的關系。在圖三中,進程一與進程二都含有3個線程,CPU會按照線程來分配任務,如圖中4個CPU同時執行前4個線程,后兩個標紅線程處于等待狀態,在CPU運行完當前線程時,等待的線程會被喚醒并進入CPU執行。通常,進程含有的線程數越多,則它占用CPU的時間會越長。


圖三、進程、線程與CPU關系

3.2 全局解釋器鎖GIL:

GIL是計算機程序設計語言解釋器用于同步線程的一種機制,它使得任何時刻僅有一個線程在執行。即便在多核心處理器上,使用 GIL 的解釋器也只允許同一時間執行一個線程。Python的Cpython解釋器(普遍使用的解釋器)使用GIL,在一個Python解釋器進程內可以執行多線程程序,但每次一個線程執行時就會獲得全局解釋器鎖,使得別的線程只能等待,由于GIL幾乎釋放的同時就會被原線程馬上獲得,那些等待線程可能剛喚醒,所以經常造成線程不平衡享受CPU資源,此時多線程的效率比單線程還要低下。在python的官方文檔里,它是這樣解釋GIL的:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

可以說它的初衷是很好的,為了保證線程間的數據安全性;但是隨著時代的發展,GIL卻成為了python并行計算的最大障礙,但這個時候GIL已經遍布CPython的各個角落,修改它的工作量太大,特別是對這種開源性的語音來說。但幸好GIL只鎖了線程,我們可以再新建解釋器進程來實現并行,那這就是multiprocessing的工作了。

3.3 multiprocessing

multiprocessing是python里的多進程包,通過它,我們可以在python程序里建立多進程來執行任務,從而進行并行計算。官方文檔如下所述:

The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
我們接下來介紹下multiprocessing的各個接口:

3.3.1 進程process

multiprocessing.Process(target=None,??args=())
    target: 可以被run()調用的函數,簡單來說就是進程中運行的函數
    args: 是target的參數

process的方法:
    start(): 開始啟動進程,在創建process之后執行
    join([timeout]):阻塞目前父進程,直到調用join方法的進程執行完或超時(timeout),才繼續執行父進程
    terminate():終止進程,不論進程有沒有執行完,盡量少用。

示例1

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',)) # p進程執行f函數,參數為'bob',注意后面的“,”
    p.start() # 進程開始
    p.join() # 阻塞主線程,直至p進程執行結束

3.3.2 進程池Process Pools

class multiprocessing.Pool([processes])
    processes是進程池中的進程數,默認是本機的cpu數量
方法:
    apply(func[, args[, kwds]])進程池中的進程進行func函數操作,操作時會阻塞進程,直至生成結果。
    apply_async(func[, args[, kwds[, callback]]])與apply類似,但是不會阻塞進程
    map(func, iterable[, chunksize])進程池中的進程進行映射操作
    map_async(func, iterable[, chunksize[, callback]])
    imap(func, iterable[, chunksize]):返回有序迭代器
    imap_unordered(func, iterable[, chunsize]):返回無序迭代器
    close():禁止進程池再接收任務
    terminate():強行終止進程池,不論是否有任務在執行
    join():在close()或terminate()之后進行,等待進程退出

示例2

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5) # 創建有5個進程的進程池
    print(p.map(f, [1, 2, 3])) # 將f函數的操作給進程池

3.3.3 Pipes & Queues

multiprocessing.Pipe([duplex])
    返回兩個連接對象(conn1, conn2),兩個連接對象分別訪問pipe的頭和尾,進行讀寫操作
    Duplex: True(default),創建的pipe是雙向的,也即兩端都可以進行讀寫;若為False,則pipe是單向的,僅可以在一端讀,另一端寫,此時與Queue類似。

multiprocessing.Queue([maxsize])
    qsize():返回queue中member數量
    empty():如果queue是空的,則返回true
    full():如果queue中member數量達到maxsize,則返回true
    put(obj):將一個object放入到queue中
    get():從隊列中取出一個object并將它從queue中移除,FIFO原則
    close():關閉隊列,并將緩存的object寫入pipe

示例

from multiprocessing import Pool
import time
def f(x):
    return x*x
if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously in a single process
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"
    it = pool.imap(f, range(10))
    print it.next()                       # prints "0"
    print it.next()                       # prints "1"
    print it.next(timeout=1)              # prints "4" unless your computer is *very* slow
    result = pool.apply_async(time.sleep, (10,))
    print result.get(timeout=1)           # raises multiprocessing.TimeoutError

3.3.4 進程鎖multiprocessing.Lock

當一個進程獲得(acquire)鎖之后,其它進程在想獲得鎖就會被禁止,可以保護數據,進行同步處理。
     acquire(block=True, timeout=None):嘗試獲取一個鎖,如果block為true,則會在獲得鎖之后阻止其它進程再獲取鎖。
     release():釋放鎖

3.3.5 共享內存——Value, Array

共享內存通常需要配合進程鎖來處理,保證處理的順序相同。

multiprocessing.Value(typecode_or_type,?*args[,?lock])
    返回一個ctype對象,
    創建c = Value(‘d’, 3.14),調用c.value()
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
    返回一個ctype數組,只能是一維的
    Array(‘i’, [1, 2, 3, 4])
Type code C Type Python Type Minimum size in bytes
'b' signed char int 1
'B' unsigned char int 1
'u' Py_UNICODE Unicode character 2
'h' signed short int 2
'H' unsigned short int 2
'i' signed int int 2
'I' unsigned int int 2
'l' signed long int 4
'L' unsigned long int 4
'q' signed long long int 8
'Q' unsigned long long int 8
'f' float float 4
'd' double float 8

3.3.6 其它方法

multiprocessing.active_children():返回當前進程的所有子進程
multiprocessing.cpu_count():返回本計算機的cpu數量
multiprocessing.current_process():返回當前進程

3.3.7 注意事項:

  1. 盡量避免共享數據
  2. 所有對象都盡量是可以pickle的
  3. 避免使用terminate強行終止進程,以造成不可預料的后果
  4. 有隊列的進程在終止前隊列中的數據需要清空,join操作應放到queue清空后
  5. 明確給子進程傳遞資源、參數

windows平臺另需注意:

  • 注意跨模塊全局變量的使用,可能被各個進程修改造成結果不統一
  • 主模塊需要加上if name == 'main':來提高它的安全性,如果有交互界面,需要加上freeze_support()

四、multiprocessing實戰

process、lock與value嘗試:

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 鎖住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 獲取共享內存
        print(v.value)
    l.release() # 釋放


def multicore():
    l = mp.Lock() # 定義一個進程鎖
    #l = 1
    v = mp.Value('i', 0) # 定義共享內存
    p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入
    p2 = mp.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__=='__main__':
    multicore()

上述代碼即對共享內存疊加5次,p1進程每次疊加1,p2進程每次疊加3,為了避免p1與p2在運行時搶奪共享數據v,在進程執行時鎖住了該進程,從而保證了執行的順序。我測試了三個案例:

  1. 直接運行上述代碼輸出[1, 2, 3, 4, 5, 8, 11, 14, 17, 20],運行時間為1.037s
  2. 在1的基礎上注釋掉鎖(上述注釋了三行),在沒有鎖的情況下,輸出[1, 4, 5, 8, 9, 12, 13, 15, 14, 16],運行時間為0.53s
  3. 在2的基礎上將p1.join()調到p2.start()前面,輸出為[1, 2, 3, 4, 5, 8, 11, 14, 17, 20],運行時間為1.042s.

可以發現,沒鎖的情況下調整join可以取得與加鎖類似的結果,這是因為join即是阻塞主進程,直至當前進程結束才回到主進程,若將p1.join()放到p1.start()后面,則會馬上阻塞主進程,使得p2要稍后才開始,這與鎖的效果一樣。
如果如上述代碼所示,p1.join()在p2.start()后面,雖然是p1先join(),但這時只是阻塞了主進程,而p2是兄弟進程,它已經開始了,p1就不能阻止它了,所以這時如果沒鎖的話p1與p2就是并行了,運行時間就是一半,但因為它們爭搶共享變量,所以輸出就變得不確定了。

pool

import multiprocessing as mp
#import pdb

def job(i):
    return i*i

def multicore():
    pool = mp.Pool()
    #pdb.set_trace()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get獲得結果
    print(res.get())
    # 迭代器,i=0時apply一次,i=1時apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 從迭代器中取出
    print([res.get() for res in multi_res])

multicore()

pool其實非常好用,特別是map與apply_async。通過pool這個接口,我們只有指定可以并行的函數與函數參數列表,它就可以自動幫我們創建多進程池進行并行計算,真的不要太方便。pool特別適用于數據并行模型,假如是消息傳遞模型那還是建議自己通過process來創立進程吧。

總結

小子這次主要是按自己的理解把并行計算理了下,對進程、線程、CPU之間的關系做了下闡述,并把python的multiprocessing這個包拎了拎,個人感覺這個里面還大有學問,上次我一個師兄用python的process來控制單次迭代的運行時間(運行超時就跳過這次迭代,進入下一次迭代)也是讓我漲了見識,后面還要多多學習啊。
感謝您花費寶貴的時間閱讀到這里,希望能有所收獲,也歡迎在評論區進行交流。

推薦好文:
multiprocessing官方文檔
python多進程的理解 multiprocessing Process join run(推薦好文)
多進程 Multiprocessing

posted @ 2018-11-03 11:01 落葉有聲 閱讀(...) 評論(...) 編輯 收藏
耐克篮球多少钱