本文將分析RxJava2中常見的操做符。java
RxJava有許多操做符,提供了不少不一樣的功能,而最終的目的就是將上游的Observable轉換成不一樣的Observable對象實現不一樣的功能。舉幾個栗子:數組
操做符先後的對象都是ObservableSource的子類,而且下游會持有上游的對象,在調用subscribeActual的時候也會執行上游的邏輯。app
emmmmm......提及來有點亂,我拿幾個經常使用的操做符來舉例說明。ide
· justui
just的做用是將參數依次發射出來,並且有不少的重載方法this
除了單參數的以外,別的方法都會調用fromArray,這個放到後面說。spa
just單參數方法將上游對象轉變成ObservableJust對象:日誌
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(T value) { this.value = value; } protected void subscribeActual(Observer<? super T> s) { ScalarDisposable sd = new ScalarDisposable(s, this.value); s.onSubscribe(sd); sd.run(); } public T call() { return this.value; } }
ObservableJust的邏輯很清晰,咱們只須要關心ScalarDisposable的run()方法便可。code
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable { /*省略部分代碼*/ final Observer<? super T> observer; final T value; public ScalarDisposable(Observer<? super T> observer, T value) { this.observer = observer; this.value = value; } public void run() { if(this.get() == 0 && this.compareAndSet(0, 2)) { this.observer.onNext(this.value); if(this.get() == 2) { this.lazySet(3); this.observer.onComplete(); } } } }
ScalarDisposable在這裏的邏輯也很清晰,就是把上游的onNext事件交給Observer的onNext方法處理。完成了RxJava的調用。server
· fromArray
上面說到多個參數的just方法都會調用到fromArray方法,拿兩個參數的爲例:
public static <T> Observable<T> just(T item1, T item2) { ObjectHelper.requireNonNull(item1, "The first item is null"); ObjectHelper.requireNonNull(item2, "The second item is null"); return fromArray(new Object[]{item1, item2}); }
更多參數也同樣,都會把參數轉換成數組傳遞給fromArray方法。而fromArray會返回ObservableFromArray對象,看看ObservableFromArray就知道具體的實現了:
public final class ObservableFromArray<T> extends Observable<T> { /*省略部分代碼*/ final T[] array; public ObservableFromArray(T[] array) { this.array = array; } public void subscribeActual(Observer<? super T> s) { ObservableFromArray.FromArrayDisposable d = new ObservableFromArray.FromArrayDisposable(s, this.array); s.onSubscribe(d); if(!d.fusionMode) { d.run(); } } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> actual; final T[] array; int index; boolean fusionMode; volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) { this.actual = actual; this.array = array; } void run() { Object[] a = this.array; int n = a.length; for(int i = 0; i < n && !this.isDisposed(); ++i) { Object value = a[i]; if(value == null) { this.actual.onError(new NullPointerException("The " + i + "th element is null")); return; } this.actual.onNext(value); } if(!this.isDisposed()) { this.actual.onComplete(); } } } }subscribeActual方法會執行到FromArrayDisposable的run方法。而後遍歷fromArray中傳進來的數組,若是事件流沒有被中止,就把數組中的對象交給Observer的onNext方法處理。
· map
map的做用是將上游發射出來的事件轉變成另外一種事件再傳遞給下游,提及來有點蒼白,仍是用代碼來演示:
Observable .just(Integer.valueOf(1)) .map(new Function<Object, Object>() { @Override public Object apply(@NonNull Object o) throws Exception { return String.valueOf(o); } }) .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable disposable) { } @Override public void onNext(Object s) { Log.d("rxrx", "onNext:" + s); Log.d("rxrx", "onNext:" + s.getClass()); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } });
04-25 09:55:43.620 24606-24606/com.newhongbin.lalala D/rxrx: onNext:1 04-25 09:55:43.620 24606-24606/com.newhongbin.lalala D/rxrx: onNext:class java.lang.String
結合日誌能夠看到,上游發送出來的Integer到了下游變成了String,而這個變換的過程就在map中實現。實現原理也比較簡單,在Observable中調用apply方法返回轉換後的對象,再交給Observer的onNext方法處理。
· flatMap、concatMap、switchMap
flatMap、concatMap、switchMap都跟map同樣起到了變換的做用,可是他們變換後的對象都是ObservableSource的實現類,將上游的事件依次包裝成ObservableSource的實現類再與Observer進行關聯。變換前是1個被觀察者,N個事件,變換後就變成N個被觀察者。
三者的區別:
flatMap:發射事件無序。
concatMap:事件有序,與上游的順序一致。
switchMap:若是前面的事件沒有發射出去,就丟棄,發射當前最新的事件。
若是上游分別延時發射 "1","2","3","4","5",通過這三種變換後Observer的執行狀況分別爲:
flatMap:由於無序發射,執行狀況不必定。
concatMap:按順序發射,按順序執行"1","2","3","4","5"。
switchMap:新的事件到來時,上一個尚未發射,因此發射最新的,最終只執行了"5"。
· buffer
buffer用於控制Observer一次處理的數量,結合示例來看:
Observable .just(1,2,3,4,5,6,7,8,9,10) .buffer(3, 3) .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable disposable) { Log.d("rxrx", "onSubscribe:"); } @Override public void onNext(Object s) { Log.d("rxrx", "onNext:" + s); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } });
04-25 14:17:30.532 32283-32283/com.newhongbin.lalala D/rxrx: onSubscribe: 04-25 14:17:30.532 32283-32283/com.newhongbin.lalala D/rxrx: onNext:[1, 2, 3] 04-25 14:17:30.532 32283-32283/com.newhongbin.lalala D/rxrx: onNext:[4, 5, 6] 04-25 14:17:30.532 32283-32283/com.newhongbin.lalala D/rxrx: onNext:[7, 8, 9] 04-25 14:17:30.532 32283-32283/com.newhongbin.lalala D/rxrx: onNext:[10]
上游依次發射了10個事件,下游的Obsever每次處理buffer指定的事件數量個,不足的也一次處理。第一個參數表示一次處理的事件數量,第二個參數表示下一次處理事件時要在跳過幾個事件。當兩個參數相同時,Observer會被包裝成BufferExactObserver,看看BufferExactObserver的四個方法:
public void onSubscribe(Disposable s) { if(DisposableHelper.validate(this.s, s)) { this.s = s; this.actual.onSubscribe(this); } } public void onNext(T t) { Collection b = this.buffer; if(b != null) { b.add(t); if(++this.size >= this.count) { this.actual.onNext(b); this.size = 0; this.createBuffer(); } } } public void onError(Throwable t) { this.buffer = null; this.actual.onError(t); } public void onComplete() { Collection b = this.buffer; this.buffer = null; if(b != null && !b.isEmpty()) { this.actual.onNext(b); } this.actual.onComplete(); }
在事件發送到到onNext的時候,並不會當即交給Observer處理,而是放到一個集合中,當集合的數量達到咱們的限定值時會把整個集合交給Observer的onNext處理。最後事件所有發送完畢,調用onComplete,若是這時候集合中還有事件沒有處理的,就先把集合交給Observer的onNext處理,再執行Observer的onComplete方法。
buffer有很是多的重載方法,支持多種數量指定的方式,好比:
public final Observable<List<T>> buffer(long timespan, TimeUnit unit) { return this.buffer(timespan, unit, Schedulers.computation(), 2147483647); }
這個方法能夠指定Observer每次處理單位時間內發射的事件。
· zip
zip的做用是將多個Observable發射出來的事件組合到一塊兒,結合成一個事件序列傳給下游。下面用一個簡單的例子說明:
Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { //發出3個字符串事件「一」、「二」、「三」 observableEmitter.onNext("一"); observableEmitter.onNext("二"); observableEmitter.onNext("三"); observableEmitter.onComplete(); } }); Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception { //發出2個整型事件一、2 observableEmitter.onNext(1); observableEmitter.onNext(2); observableEmitter.onComplete(); } }); Observable.zip(stringObservable, integerObservable, new BiFunction<String, Integer, Object>() { @Override public Object apply(@NonNull String s, @NonNull Integer integer) throws Exception { //多種事件的組合方式 return s + integer; } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable disposable) { Log.d("rxrx", "start"); } @Override public void onNext(Object o) { Log.d("rxrx", o.toString()); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { Log.d("rxrx", "onComplete"); } });
05-10 15:41:21.440 31242-31242/com.newhongbin.lalala D/rxrx: start 05-10 15:41:21.440 31242-31242/com.newhongbin.lalala D/rxrx: 一1 05-10 15:41:21.440 31242-31242/com.newhongbin.lalala D/rxrx: 二2 05-10 15:41:21.440 31242-31242/com.newhongbin.lalala D/rxrx: onComplete
這個例子將兩個Observable組合到一塊兒,按照apply中的合併規則合併事件,並且經過最後的結果也能夠知道,最終的事件數量以較少的Observable的事件數量爲準。我原本想輔以源碼詳細說說zip的原理,可是想一想這個代碼繞來繞去有點暈,我試試能不能簡單的說清楚。
首先上游會傳入n個Observable,組裝成Observable[n]。zip方法會根據Observable的數量建立n個ZipObserver,與n個Observable一一關聯。每一個ZipObserver內部維護了一個隊列,當上遊Observable發射事件的時候,對應的ZipObserver會把事件放入隊列,而後執行drain方法。drain的工做就是遍歷全部的ZipObserver,若是全部的ZipObserver的隊列都有事件,就按照Function#apply中的規則,轉換成最終的事件,交給真正的Observer處理。若是存在ZipObserver的隊列中沒有事件,drain方法結束,等待下一次的drain。當事件較少的Observable完成全部的事件發射,那麼整個zip過程準備結束。(drain方法有點繞,具體的實現也比我說的更爲複雜一些,有興趣能夠自行查閱代碼)
總結