Reaktive programmeringsoperatører i RxJava 2

Hvis Android-appen din skal rack opp de femstjerners vurderingene i Google Play, må den være i stand til å kunne multi-oppgave.

Som et minimum, forventer dagens mobilbrukere fortsatt å kunne kommunisere med appen din mens den jobber i bakgrunnen. Dette kan høres greit ut, men Android er enkeltgjenget som standard, så hvis du skal møte publikums forventninger, må du før eller senere legge til noen flere tråder.

I den forrige artikkelen i denne serien fikk vi en introduksjon til RxJava, et reaktivt bibliotek for JVM som kan hjelpe deg med å lage Android-applikasjoner som reagerer på data og hendelser som de oppstår. Men du kan også bruke dette biblioteket til å reagere på data og hendelser samtidig.

I dette innlegget skal jeg vise deg hvordan du kan bruke RxJavas operatører til å endelig gjøre samtidighet på Android en smertefri opplevelse. Ved slutten av denne artikkelen vil du vite hvordan du bruker RxJava-operatører til å lage flere tråder, angi arbeidet som skal oppstå på disse tråder, og deretter legge resultatene tilbake til Android's viktigste viktige hovedbruddstråd-alt med bare en få linjer med kode.

Og siden ingen teknologi er perfekt, vil jeg også fortelle deg om et stort potensielt fallgrop om å legge RxJava-biblioteket til dine prosjekter - før du viser hvordan du bruker operatører for å sikre dette problemet aldri skjer i dine egne Android-prosjekter.

Innføring av operatører

RxJava har en enorm samling av operatører som hovedsakelig er ment å hjelpe deg med å modifisere, filtrere, slå sammen og transformere dataene som sendes ut av din observers. Du finner den komplette listen over RxJava-operatører over på de offisielle dokumentene, og selv om ingen forventer at du skal huske hver enkelt operatør, Det er verdt å bruke litt tid på å lese gjennom denne listen, bare slik at du har en grov ide om de forskjellige datatransformasjonene du kan utføre.

RxJavas liste over operatører er allerede ganske uttømmende, men hvis du ikke finner den perfekte operatøren for datatransformasjonen du hadde i tankene, kan du alltid koble flere operatører sammen. Bruke en operatør til en observer returnerer vanligvis en annen observer, så du kan bare fortsette å bruke operatører til du får de resultatene du vil ha.

Det er alt for mange RxJava-operatører til å dekke i en enkelt artikkel, og de offisielle RxJava-dokumentene gjør allerede en god jobb med å introdusere alle operatørene du kan bruke til datatransformasjoner, så jeg skal fokusere på to operatører som har mest mulig potensial for å gjøre livet ditt som en Android-utvikler enklere: subscribeOn () og observeOn ()

Multithreading Med RxJava Operatører

Hvis appen din skal gi best mulig brukeropplevelse, må den kunne utføre intensive eller langvarige oppgaver og utføre flere oppgaver samtidig, uten å blokkere Android's viktigste viktige hovedbruddstråd.

For eksempel, tenk at appen din trenger å hente litt informasjon fra to forskjellige databaser. Hvis du utfører begge disse oppgavene etter hverandre på Android-hovedtråden, vil dette ikke bare ta mye tid, men brukergrensesnittet vil ikke reagere før appen din har fullført alle opplysninger fra begge databasene. . Ikke akkurat en flott brukeropplevelse!

En langt bedre løsning er å lage to ytterligere tråder der du kan utføre begge disse oppgavene samtidig uten at de blokkerer hovedbruddstråden. Denne tilnærmingen betyr at arbeidet blir fullført dobbelt så fort, og brukeren vil kunne fortsette å samhandle med appens brukergrensesnitt i hele. Potensielt kan brukerne dine ikke engang være oppmerksom på at appen din utfører noe intensivt og langvarig arbeid i bakgrunnen. All databaseinformasjon vil bare vises i programmets brukergrensesnitt, som om det er magisk!

Ut av boksen gir Android noen verktøy som du kan bruke til å lage flere tråder, inkludert Services og IntentServices, men disse løsningene er vanskelige å implementere og kan raskt resultere i komplisert, verbose kode. I tillegg, hvis du ikke implementerer multithreading korrekt, kan du finne deg selv med et program som lekker minne og kaster alle slags feil.

For å gjøre multithreading på Android enda mer hodepine-induserende, er Android hovedbruker-tråd den eneste tråden som kan oppdatere appens brukergrensesnitt. Hvis du vil oppdatere appens brukergrensesnitt med resultatet av arbeidet som utføres på noen annen tråd, da må du vanligvis lage en handler på hovedbruddstråden, og bruk deretter dette handler å overføre data fra bakgrunns tråden til hovedtråden. Dette betyr mer kode, mer kompleksitet og flere muligheter for feil å krype inn i prosjektet.

Men RxJava har to operatører som kan hjelpe deg med å unngå mye av denne kompleksiteten og muligheten for feil.

Merk at du bruker disse operatørene i forbindelse med planleggere, som i hovedsak er komponenter som lar deg spesifisere tråder. For nå, tenk bare på planleggeren som synonym med ordet tråd.

  • subscribeOn (planlegger): Som standard, en observer sender ut data på tråden der abonnementet ble erklært, dvs. hvor du ringte .abonnere metode. I Android er dette vanligvis den viktigste brukergrensesnittet. Du kan bruke subscribeOn () operatør for å definere en annen planlegger hvor i observer bør utføre og avgi dataene sine.
  • observeOn (planlegger): Du kan bruke denne operatøren til å omdirigere din observers utslipp til en annen planlegger, effektivt endre tråden der observerMeldinger sendes, og i forlengelse tråden der dataene blir brukt.

RxJava kommer med en rekke planleggere som du kan bruke til å lage forskjellige tråder, inkludert:

  • Schedulers.io (): Designet for å brukes til IO-relaterte oppgaver. 
  • Schedulers.computation (): Designet for å brukes til beregningsoppgaver. Som standard er antall tråder i beregningsplanleggeren begrenset til antall CPUer som er tilgjengelige på enheten din.
  • Schedulers.newThread (): Oppretter en ny tråd.

Nå har du en oversikt over alle bevegelige deler, la oss se på noen eksempler på hvordan subscribeOn () og observeOn () brukes, og se noen planleggere i aksjon.

subscribeOn ()

I Android bruker du vanligvis subscribeOn () og en tilhørende planlegger å endre tråden der noe langvarig eller intensivt arbeid utføres, så det er ingen risiko for å blokkere hovedbruddstråden. Du kan for eksempel bestemme deg for å importere en stor mengde data på io () planlegger eller utfør noen beregninger på beregning () planleggeren.

I den følgende koden lager vi en ny tråd der observer vil utføre operasjonen og avgi verdiene 1, 2, og 3.

Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer); 

Mens dette er alt du trenger for å lage en tråd og begynne å sende data på den tråden, vil du kanskje ha noen bekreftelse på at dette observerbare virkelig fungerer på en ny tråd. En metode er å skrive ut navnet på tråden som søknaden din bruker for øyeblikket, i Android StudioLogcat Monitor.

Praktisk, i det forrige innlegget, Kom i gang med RxJava, opprettet vi et program som sender meldinger til Logcat Monitor på ulike stadier i Observables livssyklus, slik at vi kan bruke mye av denne koden på nytt..

Åpne prosjektet du opprettet i det innlegget, og juster koden slik at den bruker ovennevnte observer som kilde observer. Legg deretter til subscribeOn () Operatør og angi at meldingene som sendes til Logcat, skal inneholde navnet på gjeldende tråd.

Ditt ferdige prosjekt bør se slik ut:

importer android.support.v7.app.AppCompatActivity; importere android.os.Bundle; importer android.util.Log; importere io.reactivex.Observable; importere io.reactivex.Observer; importere io.reactivex.disposables.Disposable; importere io.reactivex.schedulers.Schedulers; offentlig klasse MainActivity utvider AppCompatActivity offentlig statisk endelig streng TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer);  Observer Observer = ny Observer() @Overtrid offentlig tomgang på Abonnement (Disponibel d) Log.e (TAG, "onSubscribe" + Thread.currentThread (). GetName ());  @Override public void onNext (Integer value) Log.e (TAG, "onNext:" + value + Thread.currentThread (). GetName ());  @Override public void onError (Throwable e) Log.e (TAG, "onError:");  @Override public void onComplete () Log.e (TAG, "onComplete: All Done!" + Thread.currentThread (). GetName ()); ; 

Kontroller at Android Studio's Logcat Monitor er åpen (ved å velge Android Monitor fanen, etterfulgt av logcat) og kjør prosjektet ditt på enten en fysisk Android-enhet eller en AVD. Du bør se følgende utgang i Logcat Monitor:

Her kan du se det .abonnere blir kalt på hovedbruddstråden, men det observerbare virker på en helt annen tråd.

De subscribeOn () Operatøren vil ha samme effekt uansett hvor du plasserer den i den observerbare kjeden; Du kan imidlertid ikke bruke flere subscribeOn () operatører i samme kjede. Hvis du inkluderer flere enn én subscribeOn (), så vil kjeden din bare bruke subscribeOn () det er nærmest kilden observerbar.

observeOn ()

I motsetning til subscribeOn (), hvor du plasserer observeOn () i kjeden din gjør noe som betyr at denne operatøren bare endrer tråden som brukes av observerbarhetene som vises nedstrøms

Hvis du for eksempel satt inn følgende i kjeden din, så vil alle observable som vises i kjeden fra dette punktet, bruke den nye tråden.

.observeOn (Schedulers.newThread ())

Denne kjeden vil fortsette å løpe på den nye tråden til den møter en annen observeOn () operatør, på hvilket tidspunkt vil det bytte til tråden spesifisert av den operatøren. Du kan kontrollere tråden der bestemte observerbare data sender sine varsler ved å sette inn flere observeOn () operatører inn i kjeden din.

Når du utvikler Android-apper, bruker du vanligvis observeOn () å sende resultatet av arbeid utført på bakgrunns tråder til Android hovedbruker-tråden. Den enkleste måten å omdirigere utslipp til Android's hovedbruker-tråd er å bruke AndroidSchedulers.mainThread Scheduler, som er inkludert som en del av RxAndroid-biblioteket, i stedet for RxJava-biblioteket. 

RxAndroid-biblioteket inneholder Android-spesifikke bindinger for RxJava 2, noe som gjør det til en verdifull tilleggsressurs for Android-utviklere (og noe vi ser nærmere på i neste innlegg i denne serien).

Hvis du vil legge til RxAndroid i prosjektet ditt, åpner du modulnivået build.gradle fil og legg til den nyeste versjonen av biblioteket til avhengighetsdelen. I skrivende stund var den nyeste versjonen av RxAndroid 2.0.1, så jeg legger til følgende:

avhengigheter ... kompilere 'io.reactivex.rxjava2: rxandroid: 2.0.1'

Etter at du har lagt til dette biblioteket i prosjektet ditt, kan du angi at resultatene av en observerbar skal sendes til appens hovedbruker-tråd, ved hjelp av en enkelt linje med kode:

.observeOn (AndroidSchedulers.mainThread ())

Med tanke på at kommunikasjon med appens hovedbruker-tråd tar opp en full side av de offisielle Android-dokumentene, er dette en stor forbedring som potensielt kan spare deg for mye tid når du oppretter multithreaded Android-applikasjoner.

RxJava er stor ulempe

Mens RxJava har mye å tilby Android-utviklere, er ingen teknologi perfekt, og RxJava har en stor fallgruve som har potensial til å krasje appen din.

Som standard driver RxJava en push-basert arbeidsflyt: data produseres oppstrøms av en observer, og skyves deretter nedstrøms til den tilordnede Observer. Hovedproblemet med en push-basert arbeidsflyt er hvor enkelt det er for produsenten (i dette tilfellet, den observer) å sende ut varer for fort for forbrukeren (Observer) å behandle.

En chatty observer og en sakte Observer kan raskt resultere i en etterspørsel av ubrukte elementer, noe som vil gi opp systemressurser og kan til og med resultere i en OutOfMemoryException. Dette problemet er kjent som mottrykk.

Hvis du mistenker at tilbaketrykket forekommer i appen din, er det noen få mulige løsninger, inkludert bruk av en operatør for å redusere antall gjenstander som blir produsert.

Opprette prøvetakingsperioder med prøve() og throttlefirst ()

Hvis en observer sender ut et stort antall elementer, så det kan ikke være nødvendig for den tildelte Observer å motta hver enkelt ett av disse elementene.

Hvis du trygt kan ignorere noen av en observerutslippene, så er det noen få operatører du kan bruke til å opprette prøveperioden, og deretter kirsebærvalg bestemte verdier som sendes ut i disse perioder:

  • De prøve() Operatøren kontrollerer utsigtsverdienes utgang med intervaller spesifisert av deg, og tar deretter det siste elementet som ble sendt ut i løpet av prøvetiden. For eksempel, hvis du inkluderer .prøve (5, SECONDS) I prosjektet ditt vil Observer motta den siste verdien som ble utgitt i hvert fem sekunders intervall. 
  • De throttleFirst () Operatør tar den første verdien som ble sendt ut under prøvetiden. For eksempel, hvis du inkluderer .throttlefirst (5, SECONDS) så vil Observer motta den første verdien som sendes ut under hvert fem sekunders intervall.  

Batching Emissions With buffer()

Hvis du ikke trygt kan hoppe over noen utslipp, kan du fortsatt være i stand til å ta noe press ut av en kamp Observer ved å gruppere utslipp i batcher og deretter sende videre en masse. Bearbeiding av batchemisjoner er vanligvis mer effektiv enn å behandle flere utslipp separat, så denne tilnærmingen bør forbedre forbrukstakten.

Du kan opprette batchede utslipp ved hjelp av buffer() operatør. Her bruker vi buffer() å batch alle elementene som sendes ut over en tre-sekunders periode:

Observable.range (0, 10) .buffer (3, SECONDS) .subscribe (System.out :: println);

Alternativt kan du bruke buffer() å lage et parti som består av et bestemt antall utslipp. For eksempel, her forteller vi buffer() å pakke utslipp til grupper på fire:

Observable.range (0, 10) .buffer (4) .subscribe (System.out :: println);

Bytte observasjoner med flowables

En alternativ metode for å redusere antall utslipp er å erstatte observer det forårsaker problemer med a Flow.

I RxJava 2 bestemte RxJava-teamet å splitte standarden observer inn i to typer: den vanlige typen vi har sett på gjennom hele denne serien, og Flows.

Flows funksjon på omtrent samme måte som observers, men med en stor forskjell: Flows bare sende så mange elementer som observatør forespørsler. Hvis du har en observer som sender ut flere elementer enn den tildelte observatøren kan forbruke, kan du vurdere å bytte til en Flow i stedet.

Før du kan begynne å bruke Flows i prosjektene dine, må du legge til følgende importerklæring:

importere io.reactivex.Flowable;

Du kan da opprette Flows bruker nøyaktig samme teknikker som brukes til å lage observers. For eksempel vil hver av de følgende kodestykker opprette en Flow som er i stand til å sende data:

Flow flytbar = Flowable.fromArray (new String [] "south", "north", "west", "east"); ... flowable.subscribe
Flow flytbar = Flowable.range (0, 20); ... flowable.subscribe ()

På dette punktet kan du lure på: Hvorfor skulle jeg noen gang bruke observers når jeg bare kan bruke Flows og ikke trenger å bekymre deg om tilbaketrykk? Svaret er at a Flow oppstår mer av en overhead enn en vanlig observer, så for å skape en høypresterende app, bør du holde fast ved observers med mindre du mistenker at søknaden din sliter med tilbakeslag.

singles

EN Flow er ikke den eneste variasjonen på observer som du finner i RxJava, som biblioteket også inneholder Enkelt klasse.

singles er nyttige når du bare trenger å sende en verdi. I disse scenariene skaper du en observer kan føles som overkill, men a Enkelt er designet for å bare sende en enkelt verdi og deretter fullføre, enten ved å ringe:

  • onSuccess (): Den Enkelt utsender sin eneste verdi.  
  • onError (): Hvis Enkelt kan ikke sende ut elementet, så vil det passere denne metoden den resulterende Throwable.

EN Enkelt vil bare ringe til en av disse metodene, og deretter avsluttes umiddelbart.

La oss se på et eksempel på a Enkelt i handling - igjen, for å spare tid vi bruker koden igjen:

importere android.os.Bundle; importer android.support.v7.app.AppCompatActivity; importer android.util.Log; importere io.reactivex.Single; importere io.reactivex.SingleObserver; importere io.reactivex.disposables.Disposable; offentlig klasse MainActivity utvider AppCompatActivity offentlig statisk endelig streng TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Single.just ("Hello World") .subscribe (getSingleObserver ());  privat SingleObserver getSingleObserver () returner ny SingleObserver() @Overtrid offentlig tomgang på Abonnement (Disponibel d) Log.e (TAG, "abonnement");  @Override public void onSuccess (String value) Log.e (TAG, "onSuccess:" + verdi);  @Override public void onError (Throwable e) Log.e (TAG, "onError:"); ; 

Kjør prosjektet ditt på en AVD eller fysisk Android-enhet, og du vil se følgende utdata i Android Studio's Logcat Monitor:

Hvis du skifter deg og ønsker å konvertere en Enkelt inn i en observer når som helst, så har RxJava nok en gang alle operatørene du trenger, inkludert:

  • slå seg sammen med(): Fusjonerer flere singles inn i en enkelt observer
  • concatWith (): Kjeder elementer utgitt av flere singles sammen, for å danne en observer utslipp. 
  • toObservable (): Konverterer a Enkelt inn i en observer som sender ut elementet som ble opprinnelig utgitt av singelen, og deretter fullfører.

Sammendrag

I dette innlegget har vi utforsket noen RxJava-operatører som du kan bruke til å opprette og administrere flere tråder, uten kompleksitet og potensial for feil som tradisjonelt er ledsaget av multithreading på Android. Vi så også hvordan du kan bruke RxAndroid-biblioteket til å kommunisere med Android's viktigste viktige hovedbruker-tråd med en enkelt linje med kode, og hvordan å sikre tilbaketrykk blir ikke et problem i søknaden din.

Vi har rørt RxAndroid-biblioteket noen ganger gjennom denne serien, men dette biblioteket er fullpakket med Android-spesifikke RxJava-bindinger som kan være uvurderlig når du jobber med RxJava på Android-plattformen, så i siste innlegg i denne serien vil vi se på RxAndroid-biblioteket i mye mer detalj.

Inntil da, sjekk ut noen av våre andre innlegg på koding for Android!