12. Paralelní výpočty
Obsah
Balíček threading
Pro práci s vlákny je v pythonu balíček threading, více zde.
Global Interpreter Lock (GIL) je mechanismus používaný v interpretu Pythonu k synchronizaci vláken. Jedná se o jednoduchý zámek, který umožňuje pouze jednomu vláknu (nebo anglicky thread), i když váš počítač má více jader procesoru.
Přestože GIL brání skutečnému paralelnímu zpracování v Pythonu, neznamená to, že vlákna jsou zcela bezcenná. Vlákna mohou být stále užitečná pro některé úlohy, jako jsou I/O operace (čtení a zápis do souborů, síťové operace, responzivnost desktopových aplikací atd.), kde je hlavním omezením čekání na odpověď od externího zdroje a nikoli výpočetní výkon procesoru.
Jednoduchý příklad
Vytvoříme 5 vláken, které se uspí na 1 až 5 vteřiny (simulace nějakého dlouhého výpočtu). Poté program pokračuje až všechny vlákna ukončí svůj výpočet.
import threading
import random
import time
def worker(num):
time.sleep(random.randint(1,5))
print(f'Worker {num} finished.')
if __name__ == '__main__':
threads = []
# Vytvoření pěti vláken
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
# Spuštění vláken
t.start()
# Čekání na dokončení všech vláken
for thread in threads:
thread.join()
print("All workers finished.")
Metoda start() je používána k spuštění vlákna. Po zavolání této metody interpret Pythonu zahájí běh kódu uvnitř vlákna ve vlastním kontextu, což znamená, že kód uvnitř vlákna začne provádět svou činnost současně s hlavním programem nebo s jinými vlákny. Pokud zavoláte metodu start() na objektu vlákna, které už bylo spuštěno nebo které už skončilo, dojde k vyhození výjimky RuntimeError.
Metoda join() se používá k tomu, aby hlavní program (nebo jiné vlákno) počkal, dokud dané vlákno neskončí svou činnost. Pokud se zavolá join() na vláknu, které ještě neskončilo, hlavní program se zablokuje (čeká), dokud vlákno neskončí. Tím se zajišťuje, že se hlavní program nepokračuje dříve, než jsou hotové všechny požadované operace ve vláknech. Pokud vlákno již skončilo, join() se okamžitě vrátí.
Synchonizace
Synchronizace je nezbytná k zajištění správného provádění více vláken nebo procesů, které se snaží přistupovat ke sdíleným prostředkům, jako jsou proměnné nebo sdílená data. Účelem je zabránit situacím, ve kterých více vláken nebo procesů souběžně mění hodnotu sdíleného prostředku, což by mohlo vést k nekonzistentním výsledkům. Kromě zámků balíček threading poskytuje i další synchronizační primitiva jako threading.Semaphore nebo threading.Barrier.
Zámek
Zámek (anglicky lock) funguje tak, že umožňuje pouze jednomu vláknu nebo procesu najednou získat přístup k chráněnému zdroji. Pokud je zámek již uzamčen vláknem nebo procesem, pak jiné vlákno nebo proces, který se pokouší zámek získat, musí čekat, dokud není zámek uvolněn.
V modulu threading se k implementaci zámků používá threading.Lock(). Zámek lze použít tak, že se zavolá metoda acquire() pro uzamčení zámku a release() pro jeho odemčení.
import threading
# Sdílená proměnná
shared_variable = 0
# Zámek pro synchronizaci vláken
lock = threading.Lock()
# Funkce, která zvyšuje hodnotu sdílené proměnné o 1
def increase():
global shared_variable
for _ in range(1000):
# Zamknutí zámku před přístupem k sdílené proměnné
lock.acquire()
shared_variable += 1
# Odemknutí zámku po provedení operace
lock.release()
# Vytvoření dvou vláken
thread1 = threading.Thread(target=increase)
thread2 = threading.Thread(target=increase)
# Spuštění vláken
thread1.start()
thread2.start()
# Čekání na dokončení obou vláken
thread1.join()
thread2.join()
# Výpis hodnoty sdílené proměnné
print("Hodnota sdílené proměnné:", shared_variable)
Semafor
Semafor je synchronizační objekt, který umožňuje omezený přístup ke sdíleným prostředkům. Semafory jsou užitečné v situacích, kdy máme omezený počet zdrojů nebo chceme omezit přístup k určitému zdroji na určitý počet vláken nebo procesů najednou.
import threading
import time
# Vytvoření semaforu s kapacitou 2
semaphore = threading.Semaphore(2)
# Funkce, která využije semafor k omezení přístupu
def access_shared_resource(thread_id):
print(f"Vlákno {thread_id} se chce dostat k sdílenému prostředku.")
# Pokus o získání semaforu
semaphore.acquire()
print(f"Vlákno {thread_id} má přístup k sdílenému prostředku.")
# Simulace práce s sdíleným prostředkem
print(f"Vlákno {thread_id} pracuje...")
time.sleep(2)
# Uvolnění semaforu
semaphore.release()
print(f"Vlákno {thread_id} uvolnilo sdílený prostředek.")
# Vytvoření vláken
threads = []
for i in range(5):
t = threading.Thread(target=access_shared_resource, args=(i,))
threads.append(t)
t.start()
# Čekání na dokončení všech vláken
for t in threads:
t.join()
print("Všechna vlákna skončila.")
Modul multiprocessing
Procesy v Pythonu nejsou ovlivněny GIL, protože každý proces má svůj vlastní interpret Pythonu s vlastním GIL, což umožňuje skutečné paralelní zpracování. To znamená, že pro aplikace, které vyžadují vysoký výkon procesoru a skutečně paralelní zpracování, může být lepší použít modul multiprocessing namísto vláken. Ten je součástí standardní knihovny Pythonu a umožňuje vytváření a řízení procesů, které mohou běžet současně a nezávisle na sobě. Více zde.
Balíček multiprocessing obsahuje všechna synchoniza primitva dostupné v threading, ale navíc má například multiprocessing.Array nebo multiprocessing.Value pro práci s pamětí sdílenou více procesy.
Jednoduchý příklad
Vytvoříme pět procesů, které počítají fibonacciho čísla (neefektivním způsobem) a výsledek poté uloží do sdíleného pole. V kódu je uvedeno i srovnání délky doby výpočtu s klasickým sekvenčním výpočtem.
V kontextu modulu multiprocessing je důležité použit if __name__ == "__main__":, aby se kód uvnitř této části nevykonával ve všech procesech, které jsou vytvářeny pomocí modulu multiprocessing. To by mohlo vést k neočekávanému chování, pokud by kód určený pro spuštění jako hlavní program byl spuštěn ve všech dceřiných procesech.
import multiprocessing
import time
def fibonacci(n):
"""Vypočítá n-tý prvek Fibonacciho posloupnosti rekuzivně."""
if n <= 1:
return 0
elif n == 2:
return 1
else:
return fibonacci(n - 1) + fibonacci(n - 2)
# Funkce, která bude prováděna v jednom z procesů
def worker(shared_array, index):
shared_array[index] = fibonacci(36)
if __name__ == "__main__":
# Vytvoření sdíleného pole o délce 5 s datovým typem 'i' (integer)
shared_array = multiprocessing.Array('i', 5)
start_time = time.time()
# Vytvoření procesů
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(shared_array, i))
processes.append(p)
p.start()
# Čekání na dokončení všech procesů
for p in processes:
p.join()
# Výpis hodnot ze sdíleného pole
print("Sdílené pole po dokončení všech procesů:", list(shared_array))
duration = time.time() - start_time
print(f"Doba paralelního výpočtu: {duration} s")
# Doba paralelního výpočtu: 1.4493789672851562 s
start_time = time.time()
fibonacci(36)
fibonacci(36)
fibonacci(36)
fibonacci(36)
fibonacci(36)
duration = time.time() - start_time
print(f"Doba sekvenčního výpočtu: {duration} s")
# Doba sekvenčního výpočtu: 5.688352108001709 s