Bài 19: Networking

  • Sử dụng Operator Map
/*Sử dụng Operator Map*/
    public void map(){
        Rx2AndroidNetworking.get("https://fierce-cove-29863.herokuapp.com/getAnUser/{userId}")
                .addPathParameter("userId", "1")
                .build()
                .getObjectObservable(UserApi.class)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<UserApi, User>() {
                    @Override
                    public User apply(UserApi userApi) throws Exception {
                        return new User(userApi);
                    }
                }).subscribe(new Observer<User>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(User user) {
                textView.setText(user.toString());
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {

            }
        });
    }
  • Sử dụng zip để phân tích dữ liệu từ 2 Api trả về
/**
     * Trả về List User thích Dế
     */
    private Observable<List<User>> getCricketFansObservable() {
        return Rx2AndroidNetworking.get("https://fierce-cove-29863.herokuapp.com/getAllCricketFans")
                .build()
                .getObjectListObservable(User.class);
    }

    /*
    * Trả về danh sách người thích bóng đá
    */
    private Observable<List<User>> getFootballFansObservable() {
        return Rx2AndroidNetworking.get("https://fierce-cove-29863.herokuapp.com/getAllFootballFans")
                .build()
                .getObjectListObservable(User.class);
    }

    /*
    * Trả về danh sách người thích cả 2 Dế và bóng đá
    */

    private void findUsersWhoLovesBoth() {
        Observable.zip(getCricketFansObservable(), getFootballFansObservable(), new BiFunction<List<User>, List<User>, List<User>>() {
            @Override
            public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
                return filterUserWhoLovesBoth(cricketFans, footballFans);
            }
        }).subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<List<User>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<User> users) {
                for (User user : users) {
                   textView.append(user.toString());
                    textView.append("\n");
                }
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    }

    private List<User> filterUserWhoLovesBoth(List<User> cricketFans, List<User> footballFans) {
        List<User> userWhoLovesBoth = new ArrayList<>();
        for (User cricketFan : cricketFans) {
            for (User footballFan : footballFans) {
                if (cricketFan.id == footballFan.id) {
                    userWhoLovesBoth.add(cricketFan);
                }
            }
        }
        return userWhoLovesBoth;
    }
  • Sử dụng flatmap và filter
 /*Sử dụng flatmap và filter*/
    private Observable<List<User>> getAllMyFriendsObservable() {
        return Rx2AndroidNetworking.get("https://fierce-cove-29863.herokuapp.com/getAllFriends/{userId}")
                .addPathParameter("userId", "1")
                .build()
                .getObjectListObservable(User.class);
    }

    public void flatMapAndFilter(){
        getAllMyFriendsObservable()
                .flatMap(new Function<List<User>, ObservableSource<User>>() {
                    @Override
                    public ObservableSource<User> apply(List<User> users) throws Exception {
                        return Observable.fromIterable(users);//Trả về từng user
                    }
                })
                .filter(new Predicate<User>() {
                    @Override
                    public boolean test(User user) throws Exception {
                        return user.isFollowing;//Trả về những user đang follow tôi
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<User>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(User user) {
                        textView.append(user.toString());
                        textView.append("\n");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
  • Sử dụng flatmap:
/*Chỉ sử dụng flatmap*/
    private Observable<List<User>> getUserListObservable() {
        return Rx2AndroidNetworking.get("https://fierce-cove-29863.herokuapp.com/getAllUsers/{pageNumber}")
                .addPathParameter("pageNumber", "0")
                .addQueryParameter("limit", "10")
                .build()
                .getObjectListObservable(User.class);
    }

    private Observable<UserDetail> getUserDetailObservable(long id) {
        return Rx2AndroidNetworking.get("https://fierce-cove-29863.herokuapp.com/getAnUserDetail/{userId}")
                .addPathParameter("userId", String.valueOf(id))
                .build()
                .getObjectObservable(UserDetail.class);
    }

    public void flatmap(){
        getUserListObservable()
                .flatMap(new Function<List<User>, ObservableSource<User>>() {
                    @Override
                    public ObservableSource<User> apply(List<User> users) throws Exception {
                        return Observable.fromIterable(users);//Trả về từng user
                    }
                })
                .flatMap(new Function<User, ObservableSource<UserDetail>>() {
                    @Override
                    public ObservableSource<UserDetail> apply(User user) throws Exception {
                        return getUserDetailObservable(user.id);
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<UserDetail>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(UserDetail userDetail) {
                        textView.setText(userDetail.toString());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

 

  • Sử dụng flatmap kèm với zip:
public void flatMapWithZip(){
        getUserListObservable()
                .flatMap(new Function<List<User>, ObservableSource<User>>() {
                    @Override
                    public ObservableSource<User> apply(List<User> users) throws Exception {
                        return Observable.fromIterable(users);
                    }
                })
                .flatMap(new Function<User, ObservableSource<Pair<UserDetail, User>>>() {
                    @Override
                    public ObservableSource<Pair<UserDetail, User>> apply(User user) throws Exception {
                        return Observable.zip(getUserDetailObservable(user.id), Observable.just(user), new BiFunction<UserDetail, User, Pair<UserDetail, User>>() {
                            @Override
                            public Pair<UserDetail, User> apply(UserDetail userDetail, User user) throws Exception {
                                return new Pair<>(userDetail, user);
                            }
                        });
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Pair<UserDetail, User>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Pair<UserDetail, User> userDetailUserPair) {
                        UserDetail userDetail = userDetailUserPair.first;
                        User user = userDetailUserPair.second;
                        textView.append(userDetail.toString());
                        textView.append("\n");
                        textView.append(user.toString());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

 

Bài 18: Tạo chức năng search

  • Tạo lớp RxSearchObservable
package com.rxjava2.android.samples.ui.search;

import android.widget.SearchView;

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;

/**
 * Created by amitshekhar on 15/10/17.
 */

public class RxSearchObservable {

    private RxSearchObservable() {
        // no instance
    }

    public static Observable<String> fromView(SearchView searchView) {

        final PublishSubject<String> subject = PublishSubject.create();

        searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
            @Override
            public boolean onQueryTextSubmit(String s) {
                subject.onComplete();
                return true;
            }

            @Override
            public boolean onQueryTextChange(String text) {
                subject.onNext(text);
                return true;
            }
        });

        return subject;
    }
}

 

  • Trong main:
package com.rxjava2.android.samples.ui.search;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.widget.SearchView;
import android.widget.TextView;

import com.rxjava2.android.samples.R;

import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;

/**
 * Created by amitshekhar on 15/10/17.
 */

public class SearchActivity extends AppCompatActivity {

    public static final String TAG = SearchActivity.class.getSimpleName();
    private SearchView searchView;
    private TextView textViewResult;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_search);
        searchView = (SearchView) findViewById(R.id.searchView);
        textViewResult = (TextView) findViewById(R.id.textViewResult);

        setUpSearchObservable();
    }

    private void setUpSearchObservable() {
        RxSearchObservable.fromView(searchView)
                .debounce(300, TimeUnit.MILLISECONDS)
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String text) throws Exception {
                        if (text.isEmpty()) {
                            textViewResult.setText("");
                            return false;
                        } else {
                            return true;
                        }
                    }
                })
                .distinctUntilChanged()//
                .switchMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String query) throws Exception {
                        return dataFromNetwork(query);//Gọi Api
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String result) throws Exception {
                        textViewResult.setText(result);
                    }
                });
    }

    /**
     * Simulation of network data
     */
    private Observable<String> dataFromNetwork(final String query) {
        return Observable.just(true)
                .delay(2, TimeUnit.SECONDS)
                .map(new Function<Boolean, String>() {
                    @Override
                    public String apply(@NonNull Boolean value) throws Exception {
                        return query;
                    }
                });
    }

}

 

Bài 17: Flatmap, Switchmap, Concatmap

  • Đặc điểm của FlatMap:

        • Operator flatMap sẽ không quan tâm đến thứ tự của các phần tử. Nó sẽ tạo một Observable mới cho mỗi phần tử và không liên quan gì đến nhau. Có phần tử sẽ emit nhanh, có phần tử emit chậm bởi vì trước đó mình đã tạo một đoạn delay ngẫu nhiên cho các phần tử.
        • Ví dụ:
          @Test
          public void flatMap() throws Exception {
              final List<String> items = Lists.newArrayList("a", "b", "c", "d", "e", "f");
          
              final TestScheduler scheduler = new TestScheduler();
          
              Observable.from(items)
                      .flatMap( s -> {
                          final int delay = new Random().nextInt(10);
                          return Observable.just(s + "x")
                                  .delay(delay, TimeUnit.SECONDS, scheduler);
                      })
                      .toList()
                      .doOnNext(System.out::println)
                      .subscribe();
          
              scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
          }
          
          

      Kết quả: [cx, ex, fx, bx, dx, ax]

  • Đặc điểm của SwitchMap:

      • Xem kết quả bạn cũng hiểu rồi đúng không 😀 Nôm na là khi một phần tử mới được emit, thì nó sẽ huỷ(unsubcribe) Observable được tạo ra trước đó và sẽ chạy Observable mới
      • Ví dụ:

    @Test
    public void switchMap() throws Exception {
        final List<String> items = Lists.newArrayList("a", "b", "c", "d", "e", "f");
    
        final TestScheduler scheduler = new TestScheduler();
    
        Observable.from(items)
                .switchMap( s -> {
                    final int delay = new Random().nextInt(10);
                    return Observable.just(s + "x")
                            .delay(delay, TimeUnit.SECONDS, scheduler);
                })
                .toList()
                .doOnNext(System.out::println)
                .subscribe();
    
        scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
    }
    

    Kết quả: [fx]

  • Đặc điểm của ConcatMap:

      • ConcatMap hoạt động gần giống với flatMap, nhưng thứ tự của các phần tử sau khi emit được giữ lại như trước. Nhưng concatMap có một vấn đề lớn đó là nó sẽ chờ cho mỗi observable hoàn thành xong công việc rồi mới chạy đến phần tử tiếp theo (Kiểu như synchronus vậy)
      • Ví dụ:
        @Test
        public void switchMap() throws Exception {
            final List<String> items = Lists.newArrayList("a", "b", "c", "d", "e", "f");
        
            final TestScheduler scheduler = new TestScheduler();
        
            Observable.from(items)
                    .concatMap( s -> {
                        final int delay = new Random().nextInt(10);
                        return Observable.just(s + "x")
                                .delay(delay, TimeUnit.SECONDS, scheduler);
                    })
                    .toList()
                    .doOnNext(System.out::println)
                    .subscribe();
        
            scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
        

    Kết quả: [ax, bx, cx, dx, ex, fx]

 

Bài 16: Operator Debounce (Thời gian chờ phát ra item)

  • Đặc điểm sử dụng:

    • Chỉ phát ra 1 item trong khoảng thời gian lớn hơn thời gian chờ.
  • Ví dụ:
package com.rxjava2.android.samples.ui.operators;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

import com.rxjava2.android.samples.R;
import com.rxjava2.android.samples.utils.AppConstant;

import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

/**
 * Created by amitshekhar on 22/12/16.
 */

public class DebounceExampleActivity extends AppCompatActivity {

    private static final String TAG = DebounceExampleActivity.class.getSimpleName();
    Button btn;
    TextView textView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_example);
        btn = (Button) findViewById(R.id.btn);
        textView = (TextView) findViewById(R.id.textView);

        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                doSomeWork();
            }
        });
    }

    /*
    * Using debounce() -> only emit an item from an Observable if a particular time-span has
    * passed without it emitting another item, so it will emit 2, 4, 5 as we have simulated it.
    */
    private void doSomeWork() {
        getObservable()
                /*Trong vòng 500 mili nếu không có phát ra thì sẽ bỏ qua*/
                .debounce(500, TimeUnit.MILLISECONDS)
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }

    private Observable<Integer> getObservable() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);//Đợi 400 giây phát ra < 500 => không phat

                emitter.onNext(2); // deliver
                Thread.sleep(505);//Đợi 505 giây phát ra > 500 =>  phat

                emitter.onNext(3); // skip
                Thread.sleep(100);//Đợi 100 giây phát ra < 500 => không phat

                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        });
    }

    private Observer<Integer> getObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(Integer value) {
                textView.append(" onNext : ");
                textView.append(AppConstant.LINE_SEPARATOR);
                textView.append(" value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext ");
                Log.d(TAG, " value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }

}

Kết quả:

D/DebounceExampleActivity: onSubscribe : false
D/DebounceExampleActivity: onNext
D/DebounceExampleActivity: value : 2
D/DebounceExampleActivity: onNext
D/DebounceExampleActivity: value : 4
D/DebounceExampleActivity: onNext
D/DebounceExampleActivity: value : 5
D/DebounceExampleActivity: onComplete

Bài 14: Operator Concat (Hộp item trả về lần lượt)

  • Đặc điểm khi sử dụng

    • Hợp nhất các kết quả từ các Observable, sau đó trả về từng item lần lượt.
  • Ví dụ:

package com.rxjava2.android.samples.ui.operators;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

import com.rxjava2.android.samples.R;
import com.rxjava2.android.samples.utils.AppConstant;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

/**
 * Created by amitshekhar on 27/08/16.
 */
public class ConcatExampleActivity extends AppCompatActivity {

    private static final String TAG = ConcatExampleActivity.class.getSimpleName();
    Button btn;
    TextView textView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_example);
        btn = (Button) findViewById(R.id.btn);
        textView = (TextView) findViewById(R.id.textView);

        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                doSomeWork();
            }
        });
    }

    /*
     * Using concat operator to combine Observable : concat maintain
     * the order of Observable.
     * It will emit all the 7 values in order
     * here - first "A1", "A2", "A3", "A4" and then "B1", "B2", "B3"
     * first all from the first Observable and then
     * all from the second Observable all in order
     */
    private void doSomeWork() {
        final String[] aStrings = {"A1", "A2", "A3", "A4"};
        final String[] bStrings = {"B1", "B2", "B3"};

        final Observable<String> aObservable = Observable.fromArray(aStrings);
        final Observable<String> bObservable = Observable.fromArray(bStrings);

        /*Nhận vào danh sách rồi trả về tuần tự các phần tử*/
        Observable.concat(aObservable, bObservable)
                .subscribe(getObserver());
    }


    private Observer<String> getObserver() {
        return new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(String value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }


}

Kết quả:
D/ConcatExampleActivity: onSubscribe : false
D/ConcatExampleActivity: onNext : value : A1
D/ConcatExampleActivity: onNext : value : A2
D/ConcatExampleActivity: onNext : value : A3
D/ConcatExampleActivity: onNext : value : A4
D/ConcatExampleActivity: onNext : value : B1
D/ConcatExampleActivity: onNext : value : B2
D/ConcatExampleActivity: onNext : value : B3
D/ConcatExampleActivity: onComplete