Python er et av de mest populære språkene for databehandling og datavitenskap generelt. Økosystemet gir mange biblioteker og rammer som gjør det mulig å utføre høy ytelse databehandling. Å gjøre parallell programmering i Python kan imidlertid være ganske vanskelig.
I denne opplæringen skal vi studere hvorfor parallellitet er vanskelig, spesielt i Python-konteksten, og for det vil vi gå gjennom følgende:
De Global Interpreter Lock (GIL) er en av de mest kontroversielle fagene i Python-verdenen. I CPython, den mest populære implementeringen av Python, er GIL en mutex som gjør ting trådsikker. GIL gjør det enkelt å integrere med eksterne biblioteker som ikke er trådsikker, og det gjør ikke-parallell kode raskere. Dette kommer til en pris, skjønt. På grunn av GIL kan vi ikke oppnå ekte parallellitet via multithreading. I utgangspunktet kan to forskjellige innfødte tråder av samme prosess ikke kjøres Python-kode på en gang.
Det er imidlertid ikke så ille, og her er hvorfor: Ting som skjer utenfor GIL-rike er fri for å være parallell. I denne kategorien faller langvarige oppgaver som I / O, og heldigvis, biblioteker som numpy
.
Så Python er ikke virkelig multithreaded. Men hva er en tråd? La oss ta et skritt tilbake og se på ting i perspektiv.
En prosess er en grunnleggende operativsystemabstraksjon. Det er et program som er i fullføring, med andre ord, kode som kjører. Flere prosesser kjører alltid i en datamaskin, og de utføres parallelt.
En prosess kan ha flere tråder. De utfører samme kode som tilhører foreldreprosessen. Ideelt sett løper de parallelt, men ikke nødvendigvis. Årsaken til at prosesser ikke er nok, er at applikasjoner må være responsive og lytte etter brukerhandlinger mens du oppdaterer skjermen og lagrer en fil.
Hvis det fortsatt er litt uklart, her er en cheatsheet:
PROSESSER | GJENGER |
---|---|
Prosesser deler ikke minne | Tråder deler minne |
Gyting / bytteprosesser er dyrt | Gyting / koblingstråd er billigere |
Prosesser krever flere ressurser | Tråder krever færre ressurser (kalles noen ganger lette prosesser) |
Ingen minnesynkronisering er nødvendig | Du må bruke synkroniseringsmekanismer for å være sikker på at du håndterer dataene riktig |
Det er ikke en oppskrift som plasserer alt. Å velge en er i stor grad avhengig av konteksten og oppgaven du prøver å oppnå.
Nå går vi et skritt videre og dykker inn i samtidighet. Samtidighet er ofte misforstått og feilaktig for parallellisme. Det er ikke tilfelle. Sammenligning innebærer planlegging av uavhengig kode som skal utføres på en kooperativ måte. Benytt deg av det faktum at et stykke kode venter på I / O-operasjoner, og i løpet av den tiden kjører en annen, men uavhengig del av koden.
I Python kan vi oppnå lette samtidige oppførsel via grønt. Fra et parallelliseringsperspektiv er det bruk av tråder eller grøfter tilsvarende fordi ingen av dem går parallelt. Greenlets er enda billigere å lage enn tråder. På grunn av det, er greenlets sterkt brukt til å utføre et stort antall enkle I / O-oppgaver, som de som vanligvis finnes i nettverk og webservere.
Nå som vi kjenner forskjellen mellom tråder og prosesser, parallelt og samtidig, kan vi illustrere hvordan ulike oppgaver utføres på de to paradigmene. Her er det vi skal gjøre: Vi vil løpe flere ganger, en oppgave utenfor GIL og en i den. Vi kjører dem serielt, bruker tråder og bruker prosesser. La oss definere oppgavene:
import us import time import threading import multiprocessing NUM_WORKERS = 4 def only_sleep (): "" "Ikke gjør noe, vent på en tidtaker som utløper" "" print ("PID:% s, Prosessnavn:% s, Trådnavn:% s "% (os.getpid (), multiprocessing.current_process () .navn, threading.current_thread () .navn)) time.sleep (1) def crunch_numbers ():" "" Utfør noen beregninger " :% s, Prosessnavn:% s, Trådnavn:% s "% (os.getpid (), multiprocessing.current_process () .navn, threading.current_thread () .navn)) x = 0 mens x < 10000000: x += 1
Vi har opprettet to oppgaver. Begge er langsiktige, men bare crunch_numbers
utfører beregninger aktivt. La oss løpe only_sleep
serielt, multithreaded og bruker flere prosesser og sammenligner resultatene:
## Kjør oppgaver serielt start_time = time.time () for _ i rekkevidde (NUM_WORKERS): only_sleep () end_time = time.time () print ("Seriell tid =", end_time - start_time) # Kjør oppgaver ved hjelp av tråder start_time = tid .time () tråder = [threading.Thread (target = only_sleep) for _ i rekkevidde (NUM_WORKERS)] [thread.start () for tråd i tråder] [thread.join () for tråd i tråder] end_time = time.time () print ("Tråder tid =", sluttid - starttid) # Kjør oppgaver ved hjelp av prosesser start_time = time.time () processes = [multiprocessing.Process (target = only_sleep) for _ in range (NUM_WORKERS)] [prosess. start () for prosess i prosesser] [process.join () for prosess i prosesser] end_time = time.time () print ("Parallel tid =", sluttid - starttid)
Her er produksjonen jeg har (din skal være lik, selv om PID og tider vil variere litt):
PID: 95726, Prosessnavn: MainProcess, Trådnavn: MainThread PID: 95726, Prosessnavn: MainProcess, Trådnavn: MainThread PID: 95726, Prosessnavn: MainProcess, Trådnavn: MainThread PID: 95726, Prosessnavn: MainProcess, Trådnavn : MainThread Seriell tid = 4.018089056015015 PID: 95726, Prosessnavn: MainProcess, Trådnavn: Tråd-1 PID: 95726, Prosessnavn: MainProcess, Trådnavn: Tråd-2 PID: 95726, Prosessnavn: Hovedprosess, Trådnavn: Tråd- 3 PID: 95726, Prosessnavn: MainProcess, Trådnavn: Tråd-4 Tråder time = 1.0047411918640137 PID: 95728, Prosessnavn: Prosess-1, Trådnavn: MainThread PID: 95729, Prosessnavn: Prosess-2, Trådnavn: MainThread PID: 95730, Prosessnavn: Prosess-3, Trådnavn: MainThread PID: 95731, Prosessnavn: Prosess-4, Trådnavn: MainThread Parallelltid = 1.014023780822754
Her er noen observasjoner:
I tilfelle av seriell tilnærming, ting er ganske åpenbare. Vi løper oppgavene etter hverandre. Alle fire løpene blir utført av samme tråd av samme prosess.
Bruke prosesser Vi kutter kjøretiden ned til en fjerdedel av den opprinnelige tiden, ganske enkelt fordi oppgavene utføres parallelt. Legg merke til hvordan hver oppgave utføres i en annen prosess og på MainThread
av den prosessen.
Bruke tråder Vi utnytter det faktum at oppgavene kan utføres samtidig. Utførelsestiden reduseres også til kvart, selv om ingenting kjører parallelt. Slik går det: Vi gyter den første tråden, og den begynner å vente på at timeren skal utløpe. Vi stopper utførelsen, la det vente på at timeren skal utløpe, og i denne tiden hekser vi den andre tråden. Vi gjentar dette for alle trådene. I et øyeblikk utløper timeren til den første tråden slik at vi bytter utførelse til det, og vi avslutter det. Algoritmen gjentas for den andre og for alle de andre trådene. På slutten, er resultatet som om ting kjørte parallelt. Du vil også legge merke til at de fire forskjellige trådene grener ut fra og lever i samme prosess: MainProcess
.
Du kan til og med legge merke til at den gjengede tilnærmingen er raskere enn den virkelig parallelle. Det skyldes overhead av gyteprosesser. Som nevnt tidligere er gyte- og bytteprosesser en dyr operasjon.
La oss gjøre samme rutine, men denne gangen kjører crunch_numbers
oppgave:
start_time = time.time () for _ i rekkevidde (NUM_WORKERS): crunch_numbers () end_time = time.time () print ("Serietid =", end_time - start_time) start_time = time.time () tråder = [threading.Thread (target = crunch_numbers) for _ i rekkevidde (NUM_WORKERS)] [thread.start () for tråd i tråder] [thread.join () for tråd i tråder] end_time = time.time () print ("Tråder tid =", end_time - start_time) start_time = time.time () processes = [multiprocessing.Process (target = crunch_numbers) for _ i rekkevidde (NUM_WORKERS)] [process.start () for prosess i prosesser] [process.join () for prosess i prosesser] end_time = time.time () print ("Parallel tid =", sluttid - starttid)
Her er produksjonen jeg har:
PID: 96285, Prosessnavn: MainProcess, Trådnavn: MainThread PID: 96285, Prosessnavn: MainProcess, Trådnavn: MainThread PID: 96285, Prosessnavn: MainProcess, Trådnavn: MainThread PID: 96285, Prosessnavn: MainProcess, Trådnavn : MainThread Seriell tid = 2.705625057220459 PID: 96285, Prosessnavn: MainProcess, Trådnavn: Tråd-1 PID: 96285, Prosessnavn: MainProcess, Trådnavn: Tråd-2 PID: 96285, Prosessnavn: MainProcess, Trådnavn: Tråd- 3 PID: 96285, Prosessnavn: MainProcess, Trådnavn: Tråd-4 Tråder time = 2.6961309909820557 PID: 96289, Prosessnavn: Prosess-1, Trådnavn: MainThread PID: 96290, Prosessnavn: Prosess-2, Trådnavn: MainThread PID: 96291, Prosessnavn: Prosess-3, Trådnavn: MainThread PID: 96292, Prosessnavn: Prosess-4, Trådnavn: MainThread Parallelltid = 0.8014059066772461
Hovedforskjellen her er i følge multithreaded tilnærming. Denne gangen virker det veldig likt den serielle tilnærmingen, og her er hvorfor: siden det utfører beregninger og Python ikke utfører ekte parallellitet, går trådene i utgangspunktet i gang etter hverandre, og gir utførelse til hverandre til de alle er ferdige.
Python har rike APIer for parallell / samtidig programmering. I denne opplæringen dekker vi de mest populære, men du må vite at for ethvert behov du har på dette domenet, er det nok noe der ute som kan hjelpe deg med å nå målet ditt.
I neste avsnitt bygger vi et praktisk program i mange former, ved hjelp av alle de presenterte bibliotekene. Uten videre ado, her er modulene / bibliotekene vi skal dekke:
gjenging
: Den vanlige måten å jobbe med tråder i Python. Det er et API-wrapper på høyere nivå over funksjonaliteten som er utsatt av _tråd
modul, som er et grensesnitt på lavt nivå over operativsystemets trådimplementering.
concurrent.futures
: En moduldel av standardbiblioteket som gir et enda høyere abstraksjonslag over tråder. Trådene er modellert som asynkrone oppgaver.
multi
: Ligner på gjenging
modul, som tilbyr et veldig lignende grensesnitt, men bruker prosesser i stedet for tråder.
gevent og greenlets
: Greenlets, også kalt mikro-tråder, er utførelsesenheter som kan planlegges samarbeidende og kan utføre oppgaver samtidig uten mye overhead.
selleri
: En distribuert oppgavekø på høyt nivå. Oppgavene er i kø og utført samtidig ved hjelp av ulike paradigmer som multi
eller gevent
.
Å vite teorien er fin og fin, men den beste måten å lære er å bygge noe praktisk, ikke sant? I denne delen skal vi bygge en klassisk type søknad som går gjennom alle de forskjellige paradigmene.
La oss bygge et program som kontrollerer oppetid på nettsteder. Det finnes mange slike løsninger der ute, de mest kjente er nok Jetpack Monitor og Uptime Robot. Formålet med disse appene er å varsle deg når nettstedet ditt er nede, slik at du raskt kan gjøre noe. Slik fungerer de:
Derfor er det viktig å ta en parallell / samtidig tilnærming til problemet. Etter hvert som listen over nettsteder vokser, vil det ikke garantere oss at vi går gjennom listen serielt, hver nettside blir sjekket hvert femte minutt eller så. Nettstedene kan være ned i flere timer, og eieren vil ikke bli varslet.
La oss begynne med å skrive noen verktøy:
# utils.py importtid import logging import forespørsler klasse WebsiteDownException (Unntak): pass def ping_website (adresse, timeout = 20): "" "Sjekk om et nettsted er nede. Et nettsted vurderes ned hvis enten status_code> = 400 eller hvis tidsavbrudd utløper Kast en WebsiteDownException hvis noen av nettstedets nede vilkår er oppfylt "" "prøv: response = requests.head (adresse, timeout = timeout) hvis response.status_code> = 400: logging.warning (" Nettsted% s returneres status_code =% s "% (adresse, response.status_code)) øke WebsiteDownException () unntatt requests.exceptions.RequestException: logging.warning (" Timeout utløpt for nettside% s "% adresse) heve WebsiteDownException () def notify_owner (adresse) "" Send eieren av adressen et varsel om at nettstedet deres er nede. For nå skal vi bare sove i 0,5 sekunder, men dette er hvor du vil sende en e-post, push notification eller tekstmelding "" "logging. info ("Melde eieren av% s nettside"% adresse) time.sleep (0.5) def check_webs ite (adresse): "" "Utility-funksjon: Kontroller om et nettsted er nede, hvis det er så, gi brukeren beskjed om å prøve: ping_website (adresse) unntatt WebsiteDownException: notify_owner
Vi trenger faktisk en nettside liste for å prøve systemet vårt. Lag din egen liste eller bruk min:
# websites.py WEBSITE_LIST = ['http://envato.com', 'http://amazon.co.uk', 'http://amazon.com', 'http://facebook.com', ' http://google.com ',' http://google.fr ',' http://google.es ',' http://google.co.uk ',' http://internet.org ' , 'http://gmail.com', 'http://stackoverflow.com', 'http://github.com', 'http://heroku.com', 'http: // really cool- available-domain.com ',' http://djangoproject.com ',' http://rubyonrails.org ',' http://basecamp.com ',' http://trello.com ',' http: //yiiframework.com ',' http://shopify.com ',' http://another-really-interesting-domain.co ',' http://airbnb.com ',' http: // instagram. com ',' http://snapchat.com ',' http://youtube.com ',' http://baidu.com ',' http://yahoo.com ',' http: // live. com ',' http://linkedin.com ',' http://yandex.ru ',' http://netflix.com ',' http://wordpress.com ',' http: // bing. com ',]
Normalt vil du beholde denne listen i en database sammen med eierkontaktinformasjon, slik at du kan kontakte dem. Siden dette ikke er hovedtemaet i denne opplæringen, og for enkelhets skyld, skal vi bare bruke denne Python-listen.
Hvis du har betalt veldig god oppmerksomhet, har du kanskje lagt merke til to virkelig lange domener i listen som ikke er gyldige nettsteder (jeg håper ingen kjøpte dem når du leser dette for å bevise meg feil!). Jeg la til disse to domenene for å være sikker på at vi har noen nettsteder nede på hver runde. La oss også nevne vår app UptimeSquirrel.
Først, la oss prøve den serielle tilnærmingen og se hvor dårlig det utfører. Vi vurderer dette som utgangspunkt.
# serial_squirrel.py importtid start_time = time.time () for adresse i WEBSITE_LIST: check_website (adresse) end_time = time.time () print ("Tid for SerialSquirrel:% ssecs"% (end_time - start_time)) # ADVARSEL: root : Timeout utløpt for nettstedet http://really-cool-available-domain.com # ADVARSEL: root: Timeout utløpt for nettstedet http://another-really-interesting-domain.co # ADVARSEL: root: Nettsted http: // bing.com returnerte status_code = 405 # Tid for SerialSquirrel: 15.881232261657715secs
Vi skal få litt mer kreativ med implementeringen av den trådte tilnærmingen. Vi bruker en kø for å sette adressene inn og opprette arbeidstråder for å få dem ut av køen og behandle dem. Vi skal vente på at køen er tom, noe som betyr at alle adressene har blitt behandlet av våre arbeiderråder.
# threaded_squirrel.py importtid fra kø import Kjøre fra threading import Tråd NUM_WORKERS = 4 task_queue = Queue () def worker (): # Kontroller hele køen for adresser mens True: address = task_queue.get () check_website (adresse) # Mark den behandlede oppgaven som ferdig task_queue.task_done () start_time = time.time () # Opprett arbeidstrådens tråder = [Tråd (mål = arbeider) for _ i rekkevidde (NUM_WORKERS)] # Legg til nettstedene i oppgavekøen [task_queue. sett (element) for element i WEBSITE_LIST] # Start alle arbeiderne [thread.start () for tråd i tråder] # Vent på at alle oppgavene i køen skal behandles task_queue.join () end_time = time.time () print ("Time for ThreadedSquirrel:% ssecs")% (end_time - start_time)) # ADVARSEL: root: Timeout utløpt for nettstedet http://really-cool-available-domain.com # ADVARSEL: root: Timeout utløpt for nettstedet http: / /another-really-interesting-domain.co # ADVARSEL: root: Nettsted http://bing.com returnert status_code = 405 # Tid for threadedSquirrel: 3.1107530 59387207secs
Som tidligere nevnt, concurrent.futures
er et høyt nivå API for bruk av tråder. Tilnærmingen vi tar her innebærer å bruke en ThreadPoolExecutor
. Vi skal sende oppgaver til bassenget og få tilbake futures, som er resultater som vil være tilgjengelige for oss i fremtiden. Selvfølgelig kan vi vente på at alle futures blir faktiske resultater.
# future_squirrel.py importtid import samtidig.futures NUM_WORKERS = 4 start_time = time.time () med samtident.futures.ThreadPoolExecutor (max_workers = NUM_WORKERS) som eksekutor: futures = eksekutor.submit (sjekke nettsted, adresse) for adresse i WEBSITE_LIST samtidige.futures.wait (futures) end_time = time.time () print ("Tid for FutureSquirrel:% ssecs"% (end_time - start_time)) # ADVARSEL: root: Timeout utløpt for nettstedet http: // virkelig kjølig -domain.com # ADVARSEL: root: Timeout utløpt for nettstedet http://another-really-interesting-domain.co # ADVARSEL: root: Nettsted http://bing.com returnert status_code = 405 # Tid for FutureSquirrel: 1.812899112701416secs
De multi
biblioteket gir en nesten erstatningsløsnings-API for gjenging
bibliotek. I dette tilfellet skal vi ta en tilnærming som ligner på concurrent.futures
en. Vi setter opp en multiprocessing.Pool
og sende inn oppgaver til det ved å kartlegge en funksjon til listen over adresser (tenk på den klassiske Python kart
funksjon).
# multiprocessing_squirrel.py importtid import sokkel import multiprocessing NUM_WORKERS = 4 start_time = time.time () med multiprocessing.Pool (prosesser = NUM_WORKERS) som basseng: results = pool.map_async (check_website, WEBSITE_LIST) results.wait () end_time = tid .time () print ("Tid for MultiProcessingSquirrel:% ssecs"% (end_time - start_time)) # ADVARSEL: root: Timeout utløpt for nettstedet http://really-cool-available-domain.com # ADVARSEL: root: Timeout utløpt for nettstedet http://another-really-interesting-domain.co # ADVARSEL: root: Nettsted http://bing.com returnert status_code = 405 # Tid for MultiProcessingSquirrel: 2.82245993614196767secs
Gevent er et populært alternativ for å oppnå massiv samtidighet. Det er noen ting du må vite før du bruker det:
Kode utført samtidig av greenlets er deterministisk. I motsetning til de andre presenterte alternativene, garanterer dette paradigmet at for alle to identiske løp vil du alltid få de samme resultatene i samme rekkefølge.
Du må monkey patch standard funksjoner slik at de samarbeider med gevent. Her mener jeg det. Normalt blokkerer en sokkeloperasjon. Vi venter på at operasjonen skal fullføres. Hvis vi var i et multithreaded miljø, ville planleggeren bare bytte til en annen tråd mens den andre venter på I / O. Siden vi ikke er i et multithreaded miljø, gir patenter standardfunksjonene slik at de blir ikke-blokkerende og returnerer kontrollen til den planlagte scheduleren.
For å installere hentet, kjør: pip installeres
Slik bruker du gevent til å utføre oppgaven vår ved hjelp av en gevent.pool.Pool
:
# green_squirrel.py importtid fra gevent.pool import Pool fra gevent import ape # Merk at du kan gyte mange arbeidere med gevent siden kostnaden for å opprette og bytte er svært lav NUM_WORKERS = 4 # Socket-modul for HTTP-forespørsler ape. patch_socket () start_time = time.time () pool = Pool (NUM_WORKERS) for adressen i WEBSITE_LIST: pool.spawn (check_website, address) # Vent på at ting skal fullføres pool.join () end_time = time.time () print Tid for GreenSquirrel:% ssecs "% (end_time - start_time)) # Tid for GreenSquirrel: 3.8395519256591797secs
Selleri er en tilnærming som stort sett avviger fra det vi har sett så langt. Det er kamptest i sammenheng med svært komplekse og høykvalitetsmiljøer. Oppsett av Selleri vil kreve litt mer tinkering enn alle ovennevnte løsninger.
Først må vi installere Selleri:
pip installere selleri
Oppgaver er de sentrale konseptene i Celery-prosjektet. Alt som du vil løpe inne i Selleri, må være en oppgave. Selleri gir stor fleksibilitet for løpende oppgaver: Du kan kjøre dem synkront eller asynkront, i sanntid eller planlagt, på samme maskin eller på flere maskiner, og bruke tråder, prosesser, Eventlet eller gevent.
Arrangementet vil bli litt mer komplekst. Selleri bruker andre tjenester for å sende og motta meldinger. Disse meldingene er vanligvis oppgaver eller resultater fra oppgaver. Vi skal bruke Redis i denne opplæringen til dette formålet. Redis er et godt valg fordi det er veldig enkelt å installere og konfigurere, og det er virkelig mulig at du allerede bruker den i søknaden din til andre formål, for eksempel caching og pub / sub.
Du kan installere Redis ved å følge instruksjonene på Redis hurtigstartside. Ikke glem å installere Redis
Python bibliotek, pip installere redis
, og bunken som er nødvendig for bruk av Redis og Selleri: pip installere selleri [redis]
.
Start Redis-serveren slik: $ redis-server
For å komme i gang med å bygge ting med Selleri, må vi først lage et Selleri-program. Deretter må Selleri vite hva slags oppgaver det kan utføre. For å oppnå det, må vi registrere oppgaver i selderapplikasjonen. Vi gjør dette ved hjelp av @ app.task
dekoratør:
# celery_squirrel.py importtid fra utils import check_website fra dataimport WEBSITE_LIST fra selleri import Selleri fra selleri.result import ResultSet app = Selleri ('celery_squirrel', broker = "redis: // localhost: 6379/0", backend = "redis : // localhost: 6379/0 ") @ app.task def check_website_task (adresse): returner check_website (adresse) hvis __name__ ==" __main__ ": start_time = time.time () # Ved å bruke 'forsinkelse' = ResultatSett (): (Sjekk adressen på adressen i WEBSITE_LIST)) # Vent til oppgavene er ferdige rs.get () end_time = time.time () print ("Selleriquirrel:", end_time - start_time) # Selleriquirrel: 2.4979639053344727
Ikke bli panikk hvis ingenting skjer. Husk at Selleri er en tjeneste, og vi må kjøre den. Til nå har vi bare plassert oppgavene i Redis, men begynte ikke Selleri å utføre dem. For å gjøre det, må vi kjøre denne kommandoen i mappen der koden vår er:
selleriarbeider-en do_celery --loglevel = debug --concurrency = 4
Nå gjenopprett Python-skriptet og se hva som skjer. En ting å være oppmerksom på: Legg merke til hvordan vi passerte Redis-adressen til Redis-søknaden to ganger. De megler
parameteren spesifiserer hvor oppgavene sendes til selger, og baksiden
er hvor Selleri legger resultatene slik at vi kan bruke dem i vår app. Hvis vi ikke angir et resultat baksiden
, Det er ingen måte for oss å vite når oppgaven ble behandlet og hva resultatet var.
Vær også oppmerksom på at loggene nå er i standardproduksjonen av Selleri-prosessen, så sørg for å sjekke dem ut i riktig terminal.
Jeg håper dette har vært en interessant reise for deg og en god introduksjon til verden av parallell / samtidig programmering i Python. Dette er slutten på reisen, og det er noen konklusjoner vi kan tegne:
gjenging
og concurrent.futures
biblioteker.multi
gir et veldig lignende grensesnitt til gjenging
men for prosesser i stedet for tråder.Lær Python med vår komplette pythonveiledning, enten du er bare i gang eller du er en erfaren coder som ønsker å lære nye ferdigheter..