并发
线程的并发方式一般有两种:
- 异步执行一个任务,不需要返回结果
- 需要异步执行返回结果
第一种,直接往线程池中仍Runnable对象,代码大概如下 :
Executors.newCachedThreadPool().execute(new Runnable() {
@Override
public void run() {
//do something
}
});
第二种,一般有两种情况:
- 主线程block到数据
- 设置回调
JDK提供的方式就是主线程block到数据,类似如下代码:
Future<Object> future = Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
Object data = future.get();//block直到拿到数据
这种方式缺点很显然,主线程阻塞,有时候跟同步没啥区别了。而JDK对Future设置回调没有相应的方法,Guava扩展了这一点,引入ListenableFuture,可以对ListenableFuture设置回调。
ListenableFuture
获得ListenableFuture实例有两种方法:
- 向装饰后的线程池中提交callable
- 将普通的future转换为listenableFuture
装饰线程池
Guava提供了方法将JDK的线程池装饰成一个“可监听的线程池”。看如下代码:
@Test
public void test18() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3, new ThreadFactory() {
private AtomicLong index = new AtomicLong(0);
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "commons-thread-" + index.incrementAndGet());
}
});
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPool);
ListenableFuture<String> listenableFuture = listeningExecutorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
//System.out.println(1 / 0);
return "world";
}
});
listenableFuture.addListener(new Runnable() {
@Override
public void run() {
System.out.println("can't get return value");
}
}, MoreExecutors.directExecutor());
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("the result of future is: " + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("exception:" + t.getMessage());
}
});
Thread.sleep(5000);
}
上面代码先定义了普通的线程池,Guava通过MoreExecutors.listeningDecorator(threadPool)
将普通的线程池装饰成一个可监听的线程池,向装饰后的线程池中提交callable,就可以获得listenableFuture实例了。
可以对listenableFuture设置listener,同时指定listen执行的线程池,也可以设置回调,获取listenableFuture的结果。
可以在callable内部捕获异常,也可以抛出,在回调的onFailure中处理异常。
上面演示的是对JDK普通的线程池,当然也可以对时间调度的线程池进行装饰。示例:
/**
* 演示定时调度线程池装饰成listen的效果
*
* @throws InterruptedException
*/
@Test
public void test19() throws InterruptedException {
ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(3));
//只有callable才对应有future
ListenableScheduledFuture<?> listenableScheduledFuture = listeningScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("hello world");
}
}, 5, 3, TimeUnit.SECONDS);
//因为上面的传的是runnable,所以没有返回值,没有返回值就不会触发future的callBack
Futures.addCallback(listenableScheduledFuture, new FutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
System.out.println("not result: " + result);
}
@Override
public void onFailure(Throwable t) {
}
});
Thread.sleep(12000);
}
装饰后的可监听的时间调度线程池使用方式和原来一样。这里调度了一个runnable对象。延迟5s后,每3s执行一次。因为runnable没有返回值,所以下面的回调不会被调用。当然可以向装饰后的线程池中提交callable,这样回调就会执行。如下:
@Test
public void test20() throws InterruptedException {
ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(3));
ListenableScheduledFuture<String> schedule = listeningScheduledExecutorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
return "world";
}
}, 3, TimeUnit.SECONDS);
Futures.addCallback(schedule, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("hello " + result);
}
@Override
public void onFailure(Throwable t) {
}
});
Thread.sleep(4000);
}
以下部分内容意义不大,纯粹为了比较Guava装饰后的和JDK时间调度线程池区别 装饰后的线程池可以灵活的取消提交到其中的任务。先看下JDK的原生的情况:
@Test
public void test21() throws InterruptedException, ExecutionException {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
return "world";
}
});
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(futureTask, 3, TimeUnit.SECONDS);
Thread.sleep(2000);
//如果执行下面的代码就抛出异常,事实上主线程等2s,这个task还没有执行,是应该可以取消的。但是却抛出异常
if (!futureTask.isDone())
{
futureTask.cancel(false);
}
System.out.println(futureTask.get());
Thread.sleep(3000);
}
上述代码执行会抛出异常,理论上还没执行的任务是应该可以取消的。看下Guava装饰后的情况:
@Test
public void test22() throws InterruptedException {
ListenableFutureTask<String> listenableFutureTask = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
return "world";
}
});
ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(3));
listeningScheduledExecutorService.schedule(listenableFutureTask, 3, TimeUnit.SECONDS);
Thread.sleep(2000);
if (!listenableFutureTask.isCancelled()) {
//如果为true,正在执行的任何会中断,false 正在执行的任务继续执行,直到完成
if (listenableFutureTask.cancel(false)) {
System.out.println("task is cancelled..");
}
}
Futures.addCallback(listenableFutureTask, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("hello " + result);
}
@Override
public void onFailure(Throwable t) {
}
});
Thread.sleep(4000);
}
Guava提供的ListenableFutureTask提交到一个装饰后的时间调度线程池,未执行的任务可以随时取消,方便很多啊。工作中很少碰到这种场景,以上纯粹为了比较。
将Future转换为ListenerableFuture
Guava提供了方法将普通的future转换为listenerableFuture,以便于添加回调。
/**
* 演示将jdk的future转换为ListenableFuture
*/
@Test
public void test24() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
System.out.println(Thread.currentThread().getName());
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName());
return "world";
}
});
ListenableFuture<String> listenableFuture = JdkFutureAdapters.listenInPoolThread(future);
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println(Thread.currentThread().getName() + ": hello " + result);
}
@Override
public void onFailure(Throwable t) {
}
}, threadPool);//MoreExecutors.directExecutor();这种方式拿到的线程池还是当前的现场环境,还是同步的。
}
通过JdkFutureAdapters.listenInPoolThread(future)
转换一个普通的future。设置回调时,可以指定回调执行的线程池。
Guava提供的listenerableFuture处理可以设置回调,还能通过function进行变换。
ListenerableFuture变换
ListenerableFuture可以通过AsyncFunction进行变换,这种变换跟函数式编程中的变换差不多,都是数据格式的变换。下面代码模拟先从某个地方获得一个key,然后通过key获得一批数据。实现了Integer->List<String>
变换,变换通过Futures.transform()
实现,可以提供一个线程池异步执行变换。
@Test
public void test26() throws InterruptedException {
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3));
ListenableFuture<Integer> idFuture = listeningExecutorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// get primary key id
return 1;
}
});
AsyncFunction<Integer, List<String>> idToResult = new AsyncFunction<Integer, List<String>>() {
@Override
public ListenableFuture<List<String>> apply(Integer input) throws Exception {
//input is primary key id, get result from db use input id
Thread.sleep(3000);
List<String> result = new ArrayList<String>();
result.add("zhangsan");
result.add("lisi");
result.add("wangwu");
return Futures.immediateFuture(result);
}
};
ListenableFuture<List<String>> resultFuture = Futures.transform(idFuture, idToResult, Executors.newCachedThreadPool());
Futures.addCallback(resultFuture, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> result) {
System.out.print("get the result: " + result.toString());
}
@Override
public void onFailure(Throwable t) {
}
});
Thread.sleep(5000);
}
以上可以看到Guava对JDK并发编程的改进,通过对JDK线程池进行装饰或将一个JDK Future转换成一个ListenerableFuture,实现对Future添加回调,通过AsyncFunction可以转换Future。
线程就是程序的一个执行路径,从这个角度看,多个线程之间的关系有两种:分裂和汇聚。借用逻辑电路上的两个术语是fan-out(扇出)和fan-in(扇入)。
Fan-out(扇出)
逻辑电路中对扇出的定义如下: 扇出(fan-out):是一个定义单个逻辑门能够驱动的数字信号输入最大量的专业术语。
扇出表达了一个逻辑门电路的驱动能力。类似于如下电路图:
用作线程中的概念做对比,在线程执行路径的某处,分裂出多个线程。更广泛的意义,多个线程的执行依赖于一个线程的执行结果。这种列子很常见,比如:生成了一条技师周报,需要以app通知和短信通知的方式告诉用户。形如下面的执行路径:
\/\/todo 插入图片
app通知和短信通知显然可以并行在多个线程中执行,且依赖于于技师周报的生成。Guava如何实现线程的扇出呢?
Guava实现扇出
Guava实现扇出其实很简单,只要将多个不同的回调设置到一个ListenerableFuture上,然后设置回调执行的线程池就可以了。看如下列子:
/**
* 演示guava的fan-out(扇出)
*/
@Test
public void test27() throws InterruptedException {
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> future = listeningExecutorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "message content";
}
});
//一个ListenableFuture 驱动了两个回调(扇出)fan-out
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println(Thread.currentThread().getName() + ": send message to user phone: " + result);
}
@Override
public void onFailure(Throwable t) {
}
}, Executors.newCachedThreadPool());
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println(Thread.currentThread().getName() + ": send message as app notify:" + result);
}
@Override
public void onFailure(Throwable t) {
}
}, Executors.newCachedThreadPool());
System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
}
Guava添加多个回调需要注意:
- 即使没有任何回调,提交到线程池的callable仍然会执行
- 即使提交到线程池的callable已经执行完,事后再添加回调,这个回调依然会执行
- 添加多次回调,不会触发callable执行多次,永远只有一次
Guava的这些特点和RxJava不同,在RxJava中:
- 没有subscribe设置回调,Observable不会执行
- Observable设置了第一个回调开始立即执行,若Observable已经执行完,再添加回调,还会触发Observable执行
- 添加多次回调,Observable会执行多次
看如下RxJava的代码,可以看出这些不同:
@Test
public void test19() throws InterruptedException {
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//do anything you want and return a string result
System.out.println("thread1: " + Thread.currentThread().getName());
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task1 finished...");
subscriber.onNext("hello");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
observable1.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Thread.sleep(1000);//1s 保证Observable一定是执行完的
//再添加回调,还会触发Observable执行
observable1.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Thread.sleep(5000);
}
执行的结果如下:
thread1: RxCachedThreadScheduler-1
task1 finished...
hello
thread1: RxCachedThreadScheduler-1
task1 finished...
hello
RxJava这种执行的方式和fan-out的语义并不太一致,因为每次添加回调都要重新执行Observable,不过有一种规避的方式是将Observable加上cache()操作符。这样Observable就只会执行一次,将结果缓存。但感觉只是规避了,并不严格意义的fan-out。RxJava不能更好的实现这种模型么?有的!
RxJava实现扇出
RxJava的publish操作符可以改变Observable的执行方式。改变主要有:
- 添加回调不会触发Observable执行
- publish.connect()是Observable执行的唯一方式,即使没有任何回调
- Observable只会执行一次,只有在Observable执行完成前添加的回调才会执行
看如下例子:
@Test
public void test42_() throws InterruptedException {
ConnectableObservable<Long> publish = Observable.interval(1, TimeUnit.SECONDS).publish();
publish.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("action1: "+aLong);
}
});
publish.connect();
Thread.sleep(2000);
publish.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("action2: "+aLong);
}
});
Thread.sleep(5000);
}
定义了一个从0开始,每1s发射一个逐渐递增整数的Observable,并通过publish操作符改变Observable的执行方式。在添加一个回调后开始通过publish.connect()触发Observable的执行,然后sleep 2s又添加另一个回调。Observable和添加到线程池中的callable不同的地方是,Observable是异步多值的,在添加第二个回调的时候,Observable还在发射值,但是并不会从0开始从头发射,看下执行的结果就知道了:
action1: 0
action1: 1
action1: 2
action2: 2
action1: 3
action2: 3
action1: 4
action2: 4
action1: 5
action2: 5
action1: 6
action2: 6
可以看到前2s发射的0,1只有action1打印出来了,但是后面开始action1和action2都能接收到后面发射的值。
普通的Observable就想视频网站上的普通视频,每个观众点播的时候,都从头开始播放,每个观众看到的内容都是完整的。但是publish操作符将一个Observable变成了现场直播,直播是通过publish.connect()触发的,即使没有任何观众也开始播,中途来了一个观众,并不会从头开始播,中途开始的观众只能接收到还未播出的内容,错过的就错过了。
通过publish操作符实现fan-out是很容易的,只要先将多个回调设置到Observable上,然后在最后通过publish.connect()触发Observable执行就可以了。代码如下:
/**
* RxJava版
*/
@Test
public void test27_() {
ConnectableObservable<String> publish = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("message content");
subscriber.onCompleted();
}
}).observeOn(Schedulers.io()).publish();
publish.subscribe(new Action1<String>() {
@Override
public void call(String result) {
System.out.println(Thread.currentThread().getName() + ": send message to user phone: " + result);
}
});
publish.subscribe(new Action1<String>() {
@Override
public void call(String result) {
System.out.println(Thread.currentThread().getName() + ": send message as app notify:" + result);
}
});
publish.connect();
}
RxJava中跟publish操作符相关的还有refCount和share操作符。refCount操作符将一个ConnectableObservable变为普通的Observable,是publish的逆操作,但是一次publish-refCount后,得到的Observable和原始的有啥不同的呢?因为publish是所有的订阅者都共享原Observable发射的数据,但是必须通过connect方法触发原Observable发射数据,refCount是让原Observable再次能通过至少一个订阅者发射数据,所以在两个连接来操作的语义是:
- 原Observable发射的数据所有订阅者共享
- 原Observable通过订阅者开始发射数据,无需通过connect方法
而publish().refCount()等价于share(),也许为了书写方便吧,而share本身也有共享的意思。
@Test
public void test43_() throws InterruptedException {
//经过publish().refCount() 这个Observable就像一个直播,一个观众没有话并不会播,但是只要有一个
//观众,就开始播,且后面来的观众并不会重头开始直播,直播本身是共享的,refCount用于跟踪观众数量
//将publish().refCount()换成share()效果是等价的,看下share的源码就知道了
Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS).share();
interval.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("a--->" + aLong);
}
});
Thread.sleep(3000);
//前面a订阅的时候开始播,这里播在什么地方,就从什么地方开始拿数据
interval.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(" b--->" + aLong);
}
});
Thread.sleep(5000);
}
可以看到并不需要通过connect触发原Observable执行。且Observable发射的数据是共享的。执行的结果如下:
a--->0
a--->1
a--->2
a--->3
b--->3
a--->4
b--->4
a--->5
b--->5
a--->6
b--->6
a--->7
b--->7
Fan-in(扇入)
扇出可以看成是线程执行路径在某处分裂为多个线程,而扇入就可以看成是多个线程执行在某处进行汇聚了。类似于下图:
而扇入在开发中,根据多个汇聚的线程执行的任务是否相同,可以分为两种:
- 所有汇聚线程执行任务相同
- 所有汇聚线程执行任务不完全相同
第一种情况是很常见的,比如有一个TechnicianId的List,需要获得Technician的信息。这时可以起多个线程并发的获取Technician信息,最后汇聚结果,这样多个线程执行的任务都相同,都是根据TechnicianId取Technician。类似下图:
第二种情况有时候也会碰到,如完整的展示晒一晒页面信息,需要获取:点赞信息、图片信息、评论信息等。这些信息都是通过不同的服务调用获取的,获取他们又是相互独立的,这样可以在不同的线程中做各自的任务,最后将结果汇聚。类似下图:
Guava和RxJava如何实现两种扇入呢?
Guava实现扇入
Guava实现fan-in也分上面的两种情况,先看下第一种比较简单的,代码如下:
/**
* 演示fan in 多个线程执行路径相同的情况
* @throws InterruptedException
*/
@Test
public void test29_() throws InterruptedException {
List<Integer> technicianIds = Lists.newArrayList(1, 2, 3, 4, 5);
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ArrayList<ListenableFuture<String>> listenableFutures = Lists.newArrayList(FluentIterable.from(technicianIds).transform(new Function<Integer, ListenableFuture<String>>() {
@Override
public ListenableFuture<String> apply(Integer technicianId) {
return listeningExecutorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
//get technician info by technicianId
return "technicianInfo" + technicianId;
}
});
}
}));
//只要输入的List中又一个Future失败,就会触发fanInFuture失败
ListenableFuture<List<String>> fanInFuture = Futures.allAsList(listenableFutures);
Futures.addCallback(fanInFuture, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> result) {
System.out.println("success get result: " + result.toString());
}
@Override
public void onFailure(Throwable t) {
System.out.println("exception: " + t.getMessage());
}
}, Executors.newCachedThreadPool());
Thread.sleep(2000);
}
代码模拟一个有一个technicianId的List,然后并行的获取Technician信息,最后将每个线程的结果汇聚,得到最终结果,设置回调拿到结果。
通过Futures.allAsList()
将多个ListenableFuture的List汇聚成一个包含多个String结果的ListenableFuture。最后通过回调拿到结果。
注意:
- 最终List中的结果和listenableFutures中的顺序是一致的
Futures.allAsList()
方式汇聚,只要多个线程中有一个线程发生异常,就触发最后结果异常,执行onFailure
还有另一种汇聚方式Futures.successfulAsList()
,如果多个线程中有一个发生异常,不会触发结果异常,还是执行onSuccess,只不过结果List对应的位置上结果为null,实际应用中根据需要选择合适的方式。
第二种,不同线程执行任务不同,然后汇聚,跟第一种差不多,只不过手动一个个地向装饰后的线程池中提交callable。代码如下:
@Test
public void test33() throws InterruptedException {
ListenableFuture<Object> strFuture = Futures.immediateFuture((Object) "a");//想象这是帖子
ListenableFuture<Object> intFuture = Futures.immediateFuture((Object) 1);//点赞
ListenableFuture<Object> doubleFuture = Futures.immediateFuture((Object) (new Double(33.333)));//...
ListenableFuture<List<Object>> listListenableFuture = Futures.successfulAsList(strFuture, intFuture, doubleFuture);
Futures.addCallback(listListenableFuture, new FutureCallback<List<Object>>() {
@Override
public void onSuccess(List<Object> result) {
System.out.println(result.toString());
}
@Override
public void onFailure(Throwable t) {
}
}, Executors.newCachedThreadPool());
Thread.sleep(3000);
}
当不同的任务返回值不同的时候,将他们转换为Object,在最终的结果顺序和Futures.successfulAsList()
顺序是一致的,再强转回来。上面执行的结果:
[a, 1, 33.333]
可以看到Guava在实现fan-in模型还是比较方便的。接下来看看RxJava如何实现fan-in呢?
RxJava实现扇入
同样,对RxJava实现扇入分为两种情况,先看下多个线程执行任务相同的情况,还是上面的例子,代码如下:
/**
* 通过flatMap-toList实现执行路径相同的fan-in
*/
@Test
public void test29__() {
List<Integer> technicianIds = Lists.newArrayList(1, 2, 3, 4, 5);
List<String> result = Observable.from(technicianIds)
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer technicianId) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("technicianInfo" + technicianId);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
}, 10)//指定线程池中最大线程数
.toList()//汇聚结果
.toBlocking()
.single();
System.out.println(result);
}
RxJava版的看上去好像更简洁一点。本例中没有设置回调,直接block到了数据,如果线程中发生异常,最外层就会抛出异常,如果设置回调,可以在回调的异常处理中处理。我一般在flatMap内部通过try-catch处理。
对于多个线程执行任务不同的情况,RxJava可以通过zip操作符对多个线程执行的结果进行汇聚,看如下代码:
@Test
public void test18() throws InterruptedException {
//一个Observable就是一个并行单位,Observable通过onNext发射数据,而一切操作符都是通过Func发射的返回值
//将数据继续向下传递
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//do anything you want and return a string result
System.out.println("thread1: " + Thread.currentThread().getName());
try {
Thread.sleep(new Random().nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task1 finished...");
subscriber.onNext("zhangsan");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());//想象这返回的评论
//可以看到这里的三个Observable返回值都是不同的
Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
//do anything you want and return a integer result
System.out.println("thread2: " + Thread.currentThread().getName());
try {
Thread.sleep(new Random().nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task2 finished...");
subscriber.onNext(29);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());//想象着返回的是点赞信息
Observable<Student> observable3 = Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
//do anything you want and return a Student result
System.out.println("thread3: " + Thread.currentThread().getName());
try {
Thread.sleep(new Random().nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task3 finished...");
List<String> corses = Lists.newArrayList("math", "computor");
Student s = new Student(corses, "aaaa");
subscriber.onNext(s);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());//...
//三个Observable并发单元在这里通过zip操作符汇聚,并通过Func将三个并发单元的结果进行处理
Observable.zip(observable1, observable2, observable3, new Func3<String, Integer, Student, String>() {
@Override
public String call(String s, Integer integer, Student student) {
System.out.println(String.format("args: %s,%s,%s",s,integer,JSON.toJSONString(student)));
//对前面三个并行处理的结果进行汇聚并处理
return "name:" + s + " age:" + integer + " student: " + JSON.toJSONString(student);
}
}).subscribeOn(Schedulers.io()).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Thread.sleep(5000);
}
Student类就省略了,不影响理解程序逻辑。代码执行结果如下:
thread1: RxCachedThreadScheduler-2
thread2: RxCachedThreadScheduler-3
thread3: RxCachedThreadScheduler-4
task3 finished...
task2 finished...
task1 finished...
args: zhangsan,29,{"corses":["math","computor"],"name":"aaaa"}
name:zhangsan age:29 student: {"corses":["math","computor"],"name":"aaaa"}
RxJava还有一个zipWith操作符,跟zip作用是一样的,只不过一个是静态方法,一个是对象方法。
当然RxJava在数据处理上的操作符还有很多,能够以链式的操作对数据进行处理。在开发中,有的时候用的RxJava,有时候用Guava,不过感觉RxJava更灵活。但是Guava中有很多东西,RxJava并不能代替,反正看情况吧。
Fork-Join
JDK7中提供了fork-join框架,fork-join是解决问题的一种思想:分而治之。
分而治之这种思想有点像Map-Reduce。fork-join和map-reduce不同的地方在于,任务是如何被分成小任务的。
- fork-join是通过递归的方式,设定一个问题小到足够解决界限,也叫基本条件,当基本条件满足时,就解决这样足够小的问题,当不满足时,就递归的将问题继续变小,直到满足基本条件为止。fork-join是对问题的解决过程中,边解决,边变小,最终解决足够小的问题。最后再自底向上、由小到大不断合并不同规模的问题的解,最后得到原始问题的解。fork的过程就是不断递归把问题变小的过程,join就是不断的由小到大不断合并不同规模问题的解的过程。
- map-reduce是一开始就将一个大问题分解成n个小问题,然后n个小问题独立的去解决,解决的过程中问题的规模不会改变,不会变小。然后将n个小问题规模的解合并为原始问题的解。map的过程就是开始分解大问题的过程,这个分解就是独立的划分,没有递归的过程,也没有设定一个基本条件,分解完后就开始解决各个小问题,最后通过reduce合并。
map-reduce的各个任务可以在多个线程中解决,也可以在独立的不同机器中解决,这就是Hadoop做的事情。fork-join在不断fork子任务的时候,也是创建线程去解决子任务。
fork-join解决问题的思路如下:
将一个大的任务递归的变成规模更小的任务,多个线程分别去解决这些小的任务,为了减少线程间的任务竞争,每个线程都有自己的任务队列,但是某些线程可能执行的更快,自己的任务队列为空,这时候会“窃取”别的线程的队列中的任务,这个过程叫“任务窃取”(work-steal),为了在窃取时减少竞争,窃取时发生在队列的另一端的,即每个线程的任务队列是一个双向队列。这个过程如下图:
fork-join中的业务类有:ForkJoinTask,表示一个ForkJoin任务,这个类有两个子类
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask :用于有返回结果的任务。
只要继承这两个子任务就可以了。
fork-join实现扇入
从上面的介绍看,fork-join是可以实现扇入的。不过比较适合扇入的第一种情况:所有的执行线程执行的任务相同。因为递归的将问题规模变小,问题的性质并没有变化。
还是类似之前的列子,假如有有一个userId的list,并发的得到List<UserInfo>
的结果。
/**
* 功能描述: fork-join实现扇入<p>
*
* @author : jinpeng.chen <p>
* @version 1.0 2016-11-17
*/
public class ForkJoinTest2 {
public static class UserInfo {
String name;
int age;
public UserInfo(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "UserInfo{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
public static class GetUserInfoTask extends RecursiveTask<List<UserInfo>> {
List<Integer> userIds = new ArrayList<>();
private int start;
private int end;
public GetUserInfoTask(int start, int end, List<Integer> userIds) {
this.start = start;
this.end = end;
this.userIds = userIds;
}
public GetUserInfoTask(List<Integer> userIds) {
this.start = 0;
this.end = userIds.size() - 1;
this.userIds = userIds;
}
@Override
protected List<UserInfo> compute() {
System.out.println("thread--->" + Thread.currentThread().getName());
List<UserInfo> result = new ArrayList<>();
if (start == end) {
Integer userId = userIds.get(start);
//get user info
result.add(new UserInfo("user" + userId, new Random().nextInt(30)));
} else {
int mid = (start + end) / 2;
GetUserInfoTask leftTask = new GetUserInfoTask(start, mid,userIds);
GetUserInfoTask rightTask = new GetUserInfoTask(mid + 1, end, userIds);
ForkJoinTask<List<UserInfo>> leftFork = leftTask.fork();
ForkJoinTask<List<UserInfo>> rightFork = rightTask.fork();
result.addAll(leftFork.join());
result.addAll(rightFork.join());
}
return result;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<Integer> userIds = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
GetUserInfoTask task = new GetUserInfoTask(userIds);
ForkJoinPool pool = new ForkJoinPool(3);
ForkJoinTask<List<UserInfo>> forkJoinTask = pool.submit(task);
System.out.println(forkJoinTask.get());
}
}
当问题的规模只有一个时,直接解决。否则将问题分为前后两部分,不断的分,直到满足基本条件。然后通过join合并。