RXjava简单使用

一、什么是 RxJava?

RxJava 是一个响应式编程框架,采用观察者设计模式。所以自然少不了 Observable 和 Subscriber 这两个东东了。

RxJava 是一个开源项目,地址:https://github.com/ReactiveX/RxJava

还有一个RxAndroid,用于 Android 开发,添加了 Android 用的接口。地址:https://github.com/ReactiveX/RxAndroid

二、例子

通过请求openweathermap 的天气查询接口返回天气数据

1、增加编译依赖

dependencies {
compile fileTree(dir: ‘libs’, include: [‘*.jar’])
compile ‘com.android.support:appcompat-v7:22.0.0’
compile ‘io.reactivex:rxjava:1.0.9’
compile ‘io.reactivex:rxandroid:0.24.0’
compile ‘com.squareup.retrofit:retrofit:1.9.0’
}
retrofit 是一个 restful 请求客户端。详见:http://square.github.io/retrofit/

2、服务器接口

/**

  • 接口
  • Created by Hal on 15/4/26.
    */
    public class ApiManager {

private static final String ENDPOINT = “http://api.openweathermap.org/data/2.5“;

/**

  • 服务接口
    */
    private interface ApiManagerService {
    @GET(“/weather”)
    WeatherData getWeather(@Query(“q”) String place, @Query(“units”) String units);
    }

private static final RestAdapter restAdapter = new RestAdapter.Builder().setEndpoint(ENDPOINT).setLogLevel(RestAdapter.LogLevel.FULL).build();

private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);

/**

  • 将服务接口返回的数据,封装成{@link rx.Observable}
  • @param city
  • @return
    */
    public static Observable getWeatherData(final String city) {
    return Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber<? super WeatherData> subscriber) {
    //订阅者回调 onNext 和 onCompleted
    subscriber.onNext(apiManager.getWeather(city, “metric”));
    subscriber.onCompleted();
    }
    }).subscribeOn(Schedulers.io());
    }
    }
    订阅者的回调有三个方法,onNext,onError,onCompleted

3、接口调用

/**

  • 多个 city 请求
  • map,flatMap 对 Observable进行变换
    /
    Observable.from(CITIES).flatMap(new Func1>() {
    @Override
    public Observable call(String s) {
    return ApiManager.getWeatherData(s);
    }
    }).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(/
    onNext/new Action1() {
    @Override
    public void call(WeatherData weatherData) {
    Log.d(LOG_TAG, weatherData.toString());
    }
    }, /
    onError*/new Action1() {
    @Override
    public void call(Throwable throwable) {

}
});

/**

  • 单个 city 请求
    */
    ApiManager.getWeatherData(CITIES[0]).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1() {
    @Override
    public void call(WeatherData weatherData) {
    Log.d(LOG_TAG, weatherData.toString());
    ((TextView) findViewById(R.id.text)).setText(weatherData.toString());
    }
    }, new Action1() {
    @Override
    public void call(Throwable throwable) {
    Log.e(LOG_TAG, throwable.getMessage(), throwable);
    }
    });

/**

  • Android View 事件处理
    */
    ViewObservable.clicks(findViewById(R.id.text), false).subscribe(new Action1() {
    @Override
    public void call(OnClickEvent onClickEvent) {

}
});

subscribeOn(Schedulers.io())与observeOn(AndroidSchedulers.mainThread())分别定义了这两个动作的线程。Android UI 更新需要在主线程。
4、retrofit 支持 rxjava 整合

/**

  • 服务接口
    */
    private interface ApiManagerService {
    @GET(“/weather”)
    WeatherData getWeather(@Query(“q”) String place, @Query(“units”) String units);

/**

  • retrofit 支持 rxjava 整合
  • 这种方法适用于新接口
    */
    @GET(“/weather”)
    Observable getWeatherData(@Query(“q”) String place, @Query(“units”) String units);
    }
    Demo 代码: https://github.com/foolingdutchman/EverExample