5.RxJava线程控制Scheduler

前言


RxJavaScheduler 就是用来控制线程,比如我写的一段代码想要在新的线程中运行,使用observeOn(Schedulers.newThread())就可以了。

例子


先来一个demo

1
2
3
4
5
6
7
8
9
10
Observable.just("test") //发射数据
.observeOn(Schedulers.newThread()) //切到新线程
.map(str -> str+"1") //将数据加上字符串1
.subscribeOn(Schedulers.immediate())//指定观察者执行的线程
.subscribe(new Action1<String>() { //订阅者
@Override
public void call(String s) {
System.out.println(s);
}
});

设置线程


subscribeOn(): 设置subscribe()订阅所发生的线程,即 call() 执行的线程。只有第一次设置生效,之后多次调用无效

observeOn(): 设置Observer运行的线程,设置后线程马上切换。

线程控制器


Schedulers提供了几种线程控制器

Schedulers.computation() 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate() 在当前线程立即开始执行任务
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread() 为每个任务创建一个新线程
Schedulers.trampoline() 当其它排队的任务完成后,在当前线程排队开始执行