响应式编程RxJava之Single

简单入门

Single.just(T value);

通常使用RxJava,主要是为了异步处理一些任务。即传入一些值,经过逻辑处理之后返回结果。

以上代码中的just(T value),可以看作是传入参数。

有了输入,如何输出呢?我们讲到了异步处理,那自然得有一个异步回调才行吧,这简直理所应当吧。所以这里介绍最重要的"subscribe"概念。

Single.just(T value).subscribe();

例如:

int addValue(int a, int b) {
    return a + b;
}

// ps:不管是否subscribe(),只要使用just(),addValue()都会执行
Single.just(addValue(1, 2)).subscribe();

异步任务知道如何执行了,那咱们该想办法拿到回调了!

Single.just(addValue(1, 2)).subscribe(new SingleSubscriber<Integer>() {    
    @Override    
    public void onSuccess(Integer value) {
        // value = 3
    }
    @Override
    public void onError(Throwable error) {}
});

这里我们使用一个叫SingleSubscriber的对象接收回调。其中onSuccess(T value)会接收just(T value)的传入值,当addValue()方法抛异常的时候会自动调用onError()。

上述just()方法无论如何都只会在当前线程里执行。所以即使看上去有异步的过程,但其实这是个同步的过程!

Single高阶

compose操作符[*]

创建一个自定义的操作符,将某种范型的Single转换为另一种范型一般用不到,示例如下,将Integer的Single转为String Single。

Single.just(addValue(1, 2))
        .compose(new Single.Transformer<Integer, String>() {
            @Override
            public Single<String> call(Single<Integer> integerSingle) {
                return integerSingle.map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {                        
                        return String.valueOf(integer + 2);
                    }
                });
            }
        })
        .subscribe(new SingleSubscriber<String>() {
            @Override
            public void onSuccess(String value) {
                // value = 5
            }
            @Override
            public void onError(Throwable error) {}
        });

之前的例子中我们创建了一个Integer的Single其中返回值是一加二的结果三,但是我们其实需要的结果并不是整型的三是字符串三又或是其他类型的对象,这个时候我们就需要结合map()操作符转换一下Single。map()操作符我们之后会讲,只要知道它可以用来转换类型就行。这里值得注意的是compose()方法可以指定线程运行,即可以指定Schedulers。如果不懂不要担心,以后还会介绍。

concat操作符[*]

用来连接多个Single和Observable发射的数据。

Single.concat(Single.just(checkNetwork()), Single.just(checkMemory()), Single.just(doSth()))

仅仅用来连接Single顺序执行的,比如顺序执行检查网络,检查内存,执行任务,注意:如果某个Single调用了onError()会导致被中断。

create操作符[***]

// 作用同Single.just(addValue(1, 2));
Single.create(new Single.OnSubscribe<Integer>() {
    @Override
    public void call(SingleSubscriber<? super Integer> singleSubscriber) {
        singleSubscriber.onSuccess(addValue(1, 2));
    }
});

// 常见的示例,这是一个异步操作
Single.create(new Single.OnSubscribe<Integer>() {
    @Override
    public void call(SingleSubscriber<? super Integer> singleSubscriber) {
        // 这里被指定在IO线程
        singleSubscriber.onSuccess(addValue(1, 2));
    }
}).subscribeOn(Schedulers.io())// 指定运行在IO线程
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {            }
    @Override
    public void onError(Throwable e) {    }
    @Override
    public void onNext(Integer o) {  
        // o = 3
    }
});

值得注意的是之前我们使用的just()是一种特殊的create(),它不能指定Schedulers。只能在当前线程中运行,而create()可以指定Schedulers实现异步处理。且just()不管是否被subscribe()订阅均会被调用,而create()如果不被订阅是不会被调用的。所以我们通常可以用just()传递简单参数,而用create()处理复杂异步逻辑。

error操作符[*]

返回一个立即给订阅者发射错误通知的Single,一般用于调试,不常用。

// 如人为让concat中断:
Single.concat(s1, Single.error(new Throwable("error"), s2)).subscribe();
flatMap操作符[*]

flatMap<T,R>基本上等同于map(),唯一的区别在于flatMap的R一般用于返回Observable对象,这样随后的subscribe参数可以使用原始类型。详见代码

Single.just(1).flatMap(new Func1<Integer, Single<String>>() {
    @Override
    public Single<String> call(Integer x) {
        return Single.just(x + "");
    }}).subscribe(new Action1<String>() {// 注意这里返回值的区别
    @Override
    public void call(String s) {
        LogHelper.e("_flatMap:"+s);
    }});

Single.just(1).map(new Func1<Integer, Single<String>>() {
    @Override
    public Single<String> call(Integer x) {
        return Single.just(x + "");
    }}).subscribe(new Action1<Single<String>>() {
    @Override
    public void call(Single<String> s) {// 注意这里返回值的区别
        LogHelper.e("_flatMap:"+s);
    }});
一般map()是用于一对一的返回,而flatMap()用于一对0~多的返回。比如我们看下面这个例子:

static Observable<List<String>> query() {
    List<String> s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift"); 
    return Observable.just(s);
}
// 注意这里的参数是 query所返回的Observable的输出,并且返会一个Observable<String>
query().flatMap(new Func1<List<String>, Observable<String>>() { 
    @Override 
    public Observable<String> call(List<String> strings) {
        //结合from处理 return Observable.from(strings);
       }}).subscribe(new Action1<String>() {
          @Override 
          public void call(String s) { 
              System.out.println("_flatMap:"+s);
          }
});

输出:

_flatMap:Java
_flatMap:Android
_flatMap:Ruby
_flatMap:Ios
_flatMap:Swift

这里传入了一个List<String>,传出了多个String。而且应该多用于Observable,很少用在Single中,即使用也不如map()来的爽快,这里只做了解即可。

flatMapObservable操作符[**]

刚刚说到flatMap()和map()类似,区别在于flatMap可以返回多个值,而map只能返回一个。但在Single中flatMap只能返回Single,几乎等同于map实用性不高。而flatMapObservable就不同了,它支持将Single转化为Observable对象,可以返回多个值。下面这个例子介绍如何将Single转化为Observable。

Single.just(1).flatMapObservable(new Func1<Integer, Observable<String>>() {
    @Override
    public Observable<String> call(Integer integer) {
        return Observable.just("H", "3", "c");
    }}).subscribe(new Action1<String>() {// 注意这里返回值的区别
    @Override
    public void call(String s) {
        LogHelper.e("kk:"+s); 
   }
});

这里传入一个整型1,输出"H","3","c"三个字符串。

from操作符[*]

Single的from操作符仅允许传入一个java Future对象,由于java Future几乎很少使用,所以该操作符在Single中没什么实用意义。

map操作符[***]

Single.just(1).map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
        return "x";
    }}).subscribe(new Action1<String>() {// 注意这里返回值的区别
    @Override
    public void call(String s) {
        LogHelper.e("kk:"+s);
    }
});

map操作符之前有介绍过,用于类型一对一转换,比较简单。

merge & mergeWith操作符[*]

merge操作符类似于concat,他们的区别见下图

subscribeOn操作符[***]

用于指定异步任务的线程,常见的有:

Schedulers.computation( );// 计算线程
Schedulers.from(executor);// 自定义
Schedulers.immediate();// 当前线程
Schedulers.io();// io线程
Schedulers.newThread();// 创建新线程
Schedulers.trampoline();// 当前线程队列执行

onErrorReturn操作符[***]

相当于try catch中的return,具体意思就是当函数抛出错误的时候给出一个返回值,看代码:

Single.create(new Single.OnSubscribe<Integer>() {
    @Override
    public void call(SingleSubscriber<? super Integer> singleSubscriber) {
        singleSubscriber.onError(new Throwable("x"));
    }}).onErrorReturn(new Func1<Throwable, Integer>() {
    @Override
    public Integer call(Throwable throwable) {
        return 2;
    }}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer s) {
        LogHelper.e("kk:"+s);
    }
});

返回结果2

observeOn

指定回调所在线程

// 常见的为,即Android UI线程
AndroidSchedulers.mainThread();

timeout操作符[***]

超时操作操作,在指定时间内如果没有调用onSuccess()就判定为失败,且可支持失败的时候调用其他Single()

toSingle操作符[*]

将传入一个参数的Observable转换为Single

Observable.just(1).toSingle();

zip & zipWith操作符[**]

如果说flatMap()是将一个Single变成多个的操作,那么zip刚刚相反,他可以将多个Single整合为一个

Single.zip(s1, s2, new Func2<Integer, Integer, String>() {
    @Override
    public String call(Integer o, Integer o2) {
        LogHelper.e("A:" + o + "=" + o2);
        return null;
    }}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        LogHelper.e("kk:"+s);
    }
});

示例

Single.just(1)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return integer + "";
            }
        })
        .observeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                // result
            }
        });