Rxjava+Retrofit结合开发的封装技巧

概述

在开发中使用RxJava+Retrofit的网络框架,是时下的趋势,使用起来也非常的方便.
如果能够在一定程度上进一步封装,能够大大提高我们的开发效率.接下来我们看一下比较常用的简洁处理场景.

CreateObservable

我们都知道创建一个Observable可以使用RxJava的创建操作符,比如Create,..等等,
effective-rxjava中作者介绍了一种将functions转化为observable的方式,感觉非常新颖,果断使用了.

通过 deferjust 操作符方便的将Func0转化为 Observable

1
2
3
4
5
6
7
8
9
/**
* @return an {@link Observable} that emits invokes {@code function} upon subscription and emits
* its value
*/

public static <O> Observable<O> makeObservable(final Func0<O> function) {
checkNotNull(function);

return Observable.defer(() -> Observable.just(function.call()));
}

我们都知道defer还有一个好处就是只有订阅时才会生效,而just中的参数采用了泛型化,
即我们可以将任意一个有返回值的方法都使用 defer,产生即时的数据流.

1
2
3
4
5
private Object slowBlockingMethod() { ... }

public Observable<Object> newMethod() {
return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}

使用方式

1
2
3
4
5
6
7
8
9
public Observable<Optional<ContentItem>> fetchContentItem(
final ContentItemIdentifier contentItemId) {
return subscribeOnScheduler(() -> mContentDatabase.fetchContentItem(contentItemId));
}
// more delegating methods follow here ...
private <T> Observable<T> subscribeOnScheduler(final Func0<T> function) {
return ObservableUtils.makeObservable(function)
.subscribeOn(mScheduler);
}

Transformer

Don’t break the chain: use RxJava’s compose() operator中作者建议使用compose来避免打破RxJava的链式调用,compose中需要传入一个Transformer,我们先来看一下Transformer的源码.

1
2
3
4
5
6
7
/**
* Transformer function used by {@link #compose}.
* @warn more complete description needed
*/

public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
// cover for generics insanity
}

从中我们可以看到Transformer是一个Func1,其将一种类型的Observable转换成另一种类型的Observable

常见的Transformer用法

  • 线程切换

RxJava中最常用的某过于 线程切换 了,我们定义一个线程切换的RxSchedulerTransformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class RxSchedulerTransformer<T> implements Observable.Transformer<T, T> {

private static final RxSchedulerTransformer<Object> INSTANCE = new RxSchedulerTransformer<>();

public static <T> RxSchedulerTransformer<T> instance() {
return (RxSchedulerTransformer<T>) INSTANCE;
}

private RxSchedulerTransformer() {
}

@Override public Observable<T> call(Observable<T> tObservable) {
return tObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
}
  • 错误处理

一般和服务器交互取回的json数据结构类似下面这种,当success == true的时候才是正确的返回值,
success == false 的时候,虽然不会走 RxJavaOnError,但是也是异常.我们希望的是 所有的 异常都在
OnError中处理.OnNext只关心正确的返回值即可.

1
2
3
4
5
{
"success": false, // 是否成功
"code": "500", // 响应码
"data": "" // 内容
}

根据如上json,我们的Transformers可定义为如下格式,对结果进行一些预处理,只有正常值才返回JavaBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RxHandleResultTransformer<T>
implements Observable.Transformer<Transformers.Result<T>, T> {


private static final RxHandleResultTransformer<Object> INSTANCE =
new RxHandleResultTransformer<>();

public static <T> RxHandleResultTransformer<T> instance() {
return (RxHandleResultTransformer<T>) INSTANCE;
}

private RxHandleResultTransformer() {
}

@Override public Observable<T> call(Observable<Transformers.Result<T>> resultObservable) {
return resultObservable.flatMap(tResult -> {
if (tResult.success) {
return Observable.just(tResult.data);
} else {
return Observable.error(new RuntimeException(tResult.code));
}
});
}

final public class Result<T> {
boolean success;
String code;
T data;
}
}

自定义操作符

compose不同的是,自定义Operator 作用于Observable发射的单独的数据项,compose作用于整个流, 自定义操作符是和lift一起使用的,自定义操作符需要实现Operator

更多关于自定义操作符的介绍;
实现自己的操作符
RxJava操作符(十)自定义操作符

三级缓存

其实Retrofit是可以处理缓存的,相关介绍:Retrofit2.0使用总结及注意事项,
这里需要注意的是Retrofit缓存需要使用@GET才生效,而且是使用的文件存储.

关于RxJava缓存参考:RxJava使用场景小结,用RxJava的方式来处理缓存问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
}).onErrorReturn(throwable->null);

Observable<String> network = Observable.just("network").onErrorReturn(throwable -> null);

//主要就是靠concat operator来实现
Observable.concat(memory, disk, network)
.first().subscriber(observer);

这里用到了concatonErrorReturn两个操作符,关于concat(连接操作符),官方解释

关于onErrorReturn(错误处理操作符),可以看这里:RxJava错误处理

屏幕切换

存在如下两种问题:

  1. 我们知道 屏幕切换等配置发生改变的时候,会导致Activity的重建,当我们订阅了某一个Subscribtion后,屏幕发生了改变,
    及调用了unsubscribe方法,如何才能保证Subscribtion的延续呢?

  2. AndroidContext是导致很多 内存泄漏的罪魁祸首.如果我们创建的subscribtion持有了Context将会变得十分的危险,
    如果Observable没有准时完成,就很容易导致内存泄漏.

第一种问题,可以用RxJava的缓存机制解决,就是cache(或是replay()),

1
2
3
4
5
6
7
8
Observable<Photo> request = service.getUserPhoto(id).cache();
Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));

// ...When the Activity is being recreated...
sub.unsubscribe();

// ...Once the Activity is recreated...
request.subscribe(photo -> handleUserPhoto(photo));

第二个问题的解决方案就是在生命周期的某个时刻取消订阅。采用CompositeSubscription来管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 CompositeSubscription mCompositeSubscription;

public void addSubscription(Subscription s) {
if (this.mCompositeSubscription == null) {
this.mCompositeSubscription = new CompositeSubscription();
}
if (null != s)
{
this.mCompositeSubscription.add(s);
}
}

public void unsubscribe() {
if (this.mCompositeSubscription != null) {
this.mCompositeSubscription.clear();
}
}

@Override protected void onDestroy() {
super.onDestroy();
unsubscribe();
}

RxAndroid 中好用的方法

RxAndroid为我们提供了很多好用的API,如HandlerThreadScheduler,AndroidObservable,ViewObservable
其中HandlerThreadScheduler是一个可以绑定到Handler上的scheduler,
AndroidObservable可以绑定到 ActivityFragment,方便生命周期的管理.同时可以方便的创建一个BroadCastReceiverObservable,
ViewObservable用于给View添加绑定,如 ViewObservable.clicks()(监听View的点击事件)或者 ViewObservable.text()(监听TextView的内容变化)

1
2
3
4
5
6
7
8
9
10
AndroidObservable.bindActivity(this, retrofitService.getImage(url))
.subscribeOn(Schedulers.io())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
.subscribe(intent -> handleConnectivityChange(intent));

ViewObservable.clicks(mCardNameEditText, false)
.subscribe(view -> handleClick(view));

参考:RxJava使用场景小结
RxJava + Retrofit 的实际应用场景
Don’t break the chain: use RxJava’s compose() operator
effective-rxjava
实现自己的操作符
RxJava操作符(十)自定义操作符