并发

线程的并发方式一般有两种:

  • 异步执行一个任务,不需要返回结果
  • 需要异步执行返回结果

第一种,直接往线程池中仍Runnable对象,代码大概如下 :

Executors.newCachedThreadPool().execute(new Runnable() {
    @Override
    public void run() {
        //do something
    }
});

第二种,一般有两种情况:

  • 主线程block到数据
  • 设置回调

JDK提供的方式就是主线程block到数据,类似如下代码:

Future<Object> future = Executors.newCachedThreadPool().submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        return null;
    }
});

Object data = future.get();//block直到拿到数据

这种方式缺点很显然,主线程阻塞,有时候跟同步没啥区别了。而JDK对Future设置回调没有相应的方法,Guava扩展了这一点,引入ListenableFuture,可以对ListenableFuture设置回调。

ListenableFuture

获得ListenableFuture实例有两种方法:

  • 向装饰后的线程池中提交callable
  • 将普通的future转换为listenableFuture

装饰线程池

Guava提供了方法将JDK的线程池装饰成一个“可监听的线程池”。看如下代码:

@Test
public void test18() throws InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(3, new ThreadFactory() {
        private AtomicLong index = new AtomicLong(0);

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "commons-thread-" + index.incrementAndGet());
        }
    });
    ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPool);
    ListenableFuture<String> listenableFuture = listeningExecutorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            Thread.sleep(3000);
            //System.out.println(1 / 0);
            return "world";
        }
    });
    listenableFuture.addListener(new Runnable() {
        @Override
        public void run() {
            System.out.println("can't get return value");
        }
    }, MoreExecutors.directExecutor());

    Futures.addCallback(listenableFuture, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println("the result of future is: " + result);
        }

        @Override
        public void onFailure(Throwable t) {
            System.out.println("exception:" + t.getMessage());
        }
    });

    Thread.sleep(5000);
}

上面代码先定义了普通的线程池,Guava通过MoreExecutors.listeningDecorator(threadPool)将普通的线程池装饰成一个可监听的线程池,向装饰后的线程池中提交callable,就可以获得listenableFuture实例了。

可以对listenableFuture设置listener,同时指定listen执行的线程池,也可以设置回调,获取listenableFuture的结果。

可以在callable内部捕获异常,也可以抛出,在回调的onFailure中处理异常。

上面演示的是对JDK普通的线程池,当然也可以对时间调度的线程池进行装饰。示例:

/**
 * 演示定时调度线程池装饰成listen的效果
 *
 * @throws InterruptedException
 */
@Test
public void test19() throws InterruptedException {
    ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(3));
    //只有callable才对应有future
    ListenableScheduledFuture<?> listenableScheduledFuture = listeningScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello world");
        }
    }, 5, 3, TimeUnit.SECONDS);

    //因为上面的传的是runnable,所以没有返回值,没有返回值就不会触发future的callBack
    Futures.addCallback(listenableScheduledFuture, new FutureCallback<Object>() {
        @Override
        public void onSuccess(Object result) {
            System.out.println("not result: " + result);
        }

        @Override
        public void onFailure(Throwable t) {
        }
    });

    Thread.sleep(12000);
}

装饰后的可监听的时间调度线程池使用方式和原来一样。这里调度了一个runnable对象。延迟5s后,每3s执行一次。因为runnable没有返回值,所以下面的回调不会被调用。当然可以向装饰后的线程池中提交callable,这样回调就会执行。如下:

@Test
public void test20() throws InterruptedException {
    ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(3));
    ListenableScheduledFuture<String> schedule = listeningScheduledExecutorService.schedule(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "world";
        }
    }, 3, TimeUnit.SECONDS);
    Futures.addCallback(schedule, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println("hello " + result);
        }

        @Override
        public void onFailure(Throwable t) {
        }
    });

    Thread.sleep(4000);
}

以下部分内容意义不大,纯粹为了比较Guava装饰后的和JDK时间调度线程池区别 装饰后的线程池可以灵活的取消提交到其中的任务。先看下JDK的原生的情况:

@Test
public void test21() throws InterruptedException, ExecutionException {
    FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "world";
        }
    });
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
    ScheduledFuture<?> schedule = scheduledExecutorService.schedule(futureTask, 3, TimeUnit.SECONDS);
    Thread.sleep(2000);
    //如果执行下面的代码就抛出异常,事实上主线程等2s,这个task还没有执行,是应该可以取消的。但是却抛出异常
    if (!futureTask.isDone())
    {
        futureTask.cancel(false);
    }
    System.out.println(futureTask.get());
    Thread.sleep(3000);
}

上述代码执行会抛出异常,理论上还没执行的任务是应该可以取消的。看下Guava装饰后的情况:

@Test
public void test22() throws InterruptedException {
    ListenableFutureTask<String> listenableFutureTask = ListenableFutureTask.create(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "world";
        }
    });
    ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(3));
    listeningScheduledExecutorService.schedule(listenableFutureTask, 3, TimeUnit.SECONDS);
    Thread.sleep(2000);
    if (!listenableFutureTask.isCancelled()) {
        //如果为true,正在执行的任何会中断,false 正在执行的任务继续执行,直到完成
        if (listenableFutureTask.cancel(false)) {
            System.out.println("task is cancelled..");
        }
    }
    Futures.addCallback(listenableFutureTask, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println("hello " + result);
        }

        @Override
        public void onFailure(Throwable t) {
        }
    });

    Thread.sleep(4000);
}

Guava提供的ListenableFutureTask提交到一个装饰后的时间调度线程池,未执行的任务可以随时取消,方便很多啊。工作中很少碰到这种场景,以上纯粹为了比较。

将Future转换为ListenerableFuture

Guava提供了方法将普通的future转换为listenerableFuture,以便于添加回调。

/**
 * 演示将jdk的future转换为ListenableFuture
 */
@Test
public void test24() {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    System.out.println(Thread.currentThread().getName());
    Future<String> future = threadPool.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName());
            return "world";
        }
    });
    ListenableFuture<String> listenableFuture = JdkFutureAdapters.listenInPoolThread(future);
    Futures.addCallback(listenableFuture, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println(Thread.currentThread().getName() + ": hello " + result);
        }

        @Override
        public void onFailure(Throwable t) {
        }
    }, threadPool);//MoreExecutors.directExecutor();这种方式拿到的线程池还是当前的现场环境,还是同步的。
}

通过JdkFutureAdapters.listenInPoolThread(future)转换一个普通的future。设置回调时,可以指定回调执行的线程池。

Guava提供的listenerableFuture处理可以设置回调,还能通过function进行变换。

ListenerableFuture变换

ListenerableFuture可以通过AsyncFunction进行变换,这种变换跟函数式编程中的变换差不多,都是数据格式的变换。下面代码模拟先从某个地方获得一个key,然后通过key获得一批数据。实现了Integer->List<String>变换,变换通过Futures.transform()实现,可以提供一个线程池异步执行变换。

@Test
public void test26() throws InterruptedException {
    ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3));
    ListenableFuture<Integer> idFuture = listeningExecutorService.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            // get primary key id
            return 1;
        }
    });

    AsyncFunction<Integer, List<String>> idToResult = new AsyncFunction<Integer, List<String>>() {
        @Override
        public ListenableFuture<List<String>> apply(Integer input) throws Exception {
            //input is primary key id, get result from db use input id
            Thread.sleep(3000);
            List<String> result = new ArrayList<String>();
            result.add("zhangsan");
            result.add("lisi");
            result.add("wangwu");
            return Futures.immediateFuture(result);
        }
    };
    ListenableFuture<List<String>> resultFuture = Futures.transform(idFuture, idToResult, Executors.newCachedThreadPool());
    Futures.addCallback(resultFuture, new FutureCallback<List<String>>() {
        @Override
        public void onSuccess(List<String> result) {
            System.out.print("get the result: " + result.toString());
        }

        @Override
        public void onFailure(Throwable t) {
        }
    });

    Thread.sleep(5000);
}

以上可以看到Guava对JDK并发编程的改进,通过对JDK线程池进行装饰或将一个JDK Future转换成一个ListenerableFuture,实现对Future添加回调,通过AsyncFunction可以转换Future。

线程就是程序的一个执行路径,从这个角度看,多个线程之间的关系有两种:分裂和汇聚。借用逻辑电路上的两个术语是fan-out(扇出)和fan-in(扇入)。

Fan-out(扇出)

逻辑电路中对扇出的定义如下: 扇出(fan-out):是一个定义单个逻辑门能够驱动的数字信号输入最大量的专业术语。

扇出表达了一个逻辑门电路的驱动能力。类似于如下电路图:

用作线程中的概念做对比,在线程执行路径的某处,分裂出多个线程。更广泛的意义,多个线程的执行依赖于一个线程的执行结果。这种列子很常见,比如:生成了一条技师周报,需要以app通知和短信通知的方式告诉用户。形如下面的执行路径:

\/\/todo 插入图片

app通知和短信通知显然可以并行在多个线程中执行,且依赖于于技师周报的生成。Guava如何实现线程的扇出呢?

Guava实现扇出

Guava实现扇出其实很简单,只要将多个不同的回调设置到一个ListenerableFuture上,然后设置回调执行的线程池就可以了。看如下列子:

/**
 * 演示guava的fan-out(扇出)
 */
@Test
public void test27() throws InterruptedException {
    ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    ListenableFuture<String> future = listeningExecutorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "message content";
        }
    });
    //一个ListenableFuture 驱动了两个回调(扇出)fan-out
    Futures.addCallback(future, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println(Thread.currentThread().getName() + ": send message to user phone: " + result);
        }

        @Override
        public void onFailure(Throwable t) {
        }
    }, Executors.newCachedThreadPool());

    Futures.addCallback(future, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println(Thread.currentThread().getName() + ": send message as app notify:" + result);
        }

        @Override
        public void onFailure(Throwable t) {
        }
    }, Executors.newCachedThreadPool());
    System.out.println(Thread.currentThread().getName());
    Thread.sleep(3000);
}

Guava添加多个回调需要注意:

  • 即使没有任何回调,提交到线程池的callable仍然会执行
  • 即使提交到线程池的callable已经执行完,事后再添加回调,这个回调依然会执行
  • 添加多次回调,不会触发callable执行多次,永远只有一次

Guava的这些特点和RxJava不同,在RxJava中:

  • 没有subscribe设置回调,Observable不会执行
  • Observable设置了第一个回调开始立即执行,若Observable已经执行完,再添加回调,还会触发Observable执行
  • 添加多次回调,Observable会执行多次

看如下RxJava的代码,可以看出这些不同:

@Test
public void test19() throws InterruptedException {
    Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            //do anything you want and return a string result
            System.out.println("thread1: " + Thread.currentThread().getName());
            try {
                Thread.sleep(new Random().nextInt(500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1 finished...");
            subscriber.onNext("hello");
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.io());

    observable1.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });

    Thread.sleep(1000);//1s 保证Observable一定是执行完的

    //再添加回调,还会触发Observable执行
    observable1.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });
    Thread.sleep(5000);
}

执行的结果如下:

thread1: RxCachedThreadScheduler-1

task1 finished...

hello

thread1: RxCachedThreadScheduler-1

task1 finished...

hello

RxJava这种执行的方式和fan-out的语义并不太一致,因为每次添加回调都要重新执行Observable,不过有一种规避的方式是将Observable加上cache()操作符。这样Observable就只会执行一次,将结果缓存。但感觉只是规避了,并不严格意义的fan-out。RxJava不能更好的实现这种模型么?有的!

RxJava实现扇出

RxJava的publish操作符可以改变Observable的执行方式。改变主要有:

  • 添加回调不会触发Observable执行
  • publish.connect()是Observable执行的唯一方式,即使没有任何回调
  • Observable只会执行一次,只有在Observable执行完成前添加的回调才会执行

看如下例子:

@Test
public void test42_() throws InterruptedException {
    ConnectableObservable<Long> publish = Observable.interval(1, TimeUnit.SECONDS).publish();
    publish.subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println("action1: "+aLong);
        }
    });

    publish.connect();

    Thread.sleep(2000);

    publish.subscribe(new Action1<Long>() {

        @Override
        public void call(Long aLong) {
            System.out.println("action2: "+aLong);
        }
    });

    Thread.sleep(5000);
}

定义了一个从0开始,每1s发射一个逐渐递增整数的Observable,并通过publish操作符改变Observable的执行方式。在添加一个回调后开始通过publish.connect()触发Observable的执行,然后sleep 2s又添加另一个回调。Observable和添加到线程池中的callable不同的地方是,Observable是异步多值的,在添加第二个回调的时候,Observable还在发射值,但是并不会从0开始从头发射,看下执行的结果就知道了:

action1: 0

action1: 1

action1: 2

action2: 2

action1: 3

action2: 3

action1: 4

action2: 4

action1: 5

action2: 5

action1: 6

action2: 6

可以看到前2s发射的0,1只有action1打印出来了,但是后面开始action1和action2都能接收到后面发射的值。

普通的Observable就想视频网站上的普通视频,每个观众点播的时候,都从头开始播放,每个观众看到的内容都是完整的。但是publish操作符将一个Observable变成了现场直播,直播是通过publish.connect()触发的,即使没有任何观众也开始播,中途来了一个观众,并不会从头开始播,中途开始的观众只能接收到还未播出的内容,错过的就错过了。

通过publish操作符实现fan-out是很容易的,只要先将多个回调设置到Observable上,然后在最后通过publish.connect()触发Observable执行就可以了。代码如下:

/**
 * RxJava版
 */
@Test
public void test27_() {
    ConnectableObservable<String> publish = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("message content");
            subscriber.onCompleted();
        }
    }).observeOn(Schedulers.io()).publish();

    publish.subscribe(new Action1<String>() {
        @Override
        public void call(String result) {
            System.out.println(Thread.currentThread().getName() + ": send message to user phone: " + result);
        }
    });

    publish.subscribe(new Action1<String>() {
        @Override
        public void call(String result) {
            System.out.println(Thread.currentThread().getName() + ": send message as app notify:" + result);
        }
    });

    publish.connect();
}

RxJava中跟publish操作符相关的还有refCount和share操作符。refCount操作符将一个ConnectableObservable变为普通的Observable,是publish的逆操作,但是一次publish-refCount后,得到的Observable和原始的有啥不同的呢?因为publish是所有的订阅者都共享原Observable发射的数据,但是必须通过connect方法触发原Observable发射数据,refCount是让原Observable再次能通过至少一个订阅者发射数据,所以在两个连接来操作的语义是:

  • 原Observable发射的数据所有订阅者共享
  • 原Observable通过订阅者开始发射数据,无需通过connect方法

而publish().refCount()等价于share(),也许为了书写方便吧,而share本身也有共享的意思。

@Test
public void test43_() throws InterruptedException {
    //经过publish().refCount() 这个Observable就像一个直播,一个观众没有话并不会播,但是只要有一个
    //观众,就开始播,且后面来的观众并不会重头开始直播,直播本身是共享的,refCount用于跟踪观众数量
    //将publish().refCount()换成share()效果是等价的,看下share的源码就知道了
    Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS).share();

    interval.subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println("a--->" + aLong);
        }
    });

    Thread.sleep(3000);

    //前面a订阅的时候开始播,这里播在什么地方,就从什么地方开始拿数据
    interval.subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println("      b--->" + aLong);
        }
    });

    Thread.sleep(5000);
}

可以看到并不需要通过connect触发原Observable执行。且Observable发射的数据是共享的。执行的结果如下:

a--->0

a--->1

a--->2

a--->3

   b--->3

a--->4

   b--->4

a--->5

   b--->5

a--->6

   b--->6

a--->7

   b--->7

Fan-in(扇入)

扇出可以看成是线程执行路径在某处分裂为多个线程,而扇入就可以看成是多个线程执行在某处进行汇聚了。类似于下图:

而扇入在开发中,根据多个汇聚的线程执行的任务是否相同,可以分为两种:

  • 所有汇聚线程执行任务相同
  • 所有汇聚线程执行任务不完全相同

第一种情况是很常见的,比如有一个TechnicianId的List,需要获得Technician的信息。这时可以起多个线程并发的获取Technician信息,最后汇聚结果,这样多个线程执行的任务都相同,都是根据TechnicianId取Technician。类似下图:

第二种情况有时候也会碰到,如完整的展示晒一晒页面信息,需要获取:点赞信息、图片信息、评论信息等。这些信息都是通过不同的服务调用获取的,获取他们又是相互独立的,这样可以在不同的线程中做各自的任务,最后将结果汇聚。类似下图:

Guava和RxJava如何实现两种扇入呢?

Guava实现扇入

Guava实现fan-in也分上面的两种情况,先看下第一种比较简单的,代码如下:

/**
 * 演示fan in 多个线程执行路径相同的情况
 * @throws InterruptedException
 */
@Test
public void test29_() throws InterruptedException {
    List<Integer> technicianIds = Lists.newArrayList(1, 2, 3, 4, 5);
    ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

    ArrayList<ListenableFuture<String>> listenableFutures = Lists.newArrayList(FluentIterable.from(technicianIds).transform(new Function<Integer, ListenableFuture<String>>() {
        @Override
        public ListenableFuture<String> apply(Integer technicianId) {
            return listeningExecutorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    //get technician info by technicianId
                    return "technicianInfo" + technicianId;
                }
            });
        }
    }));
    //只要输入的List中又一个Future失败,就会触发fanInFuture失败
    ListenableFuture<List<String>> fanInFuture = Futures.allAsList(listenableFutures);

    Futures.addCallback(fanInFuture, new FutureCallback<List<String>>() {
        @Override
        public void onSuccess(List<String> result) {
            System.out.println("success get result: " + result.toString());
        }

        @Override
        public void onFailure(Throwable t) {
            System.out.println("exception: " + t.getMessage());
        }
    }, Executors.newCachedThreadPool());

    Thread.sleep(2000);
}

代码模拟一个有一个technicianId的List,然后并行的获取Technician信息,最后将每个线程的结果汇聚,得到最终结果,设置回调拿到结果。 通过Futures.allAsList()将多个ListenableFuture的List汇聚成一个包含多个String结果的ListenableFuture。最后通过回调拿到结果。

注意:

  • 最终List中的结果和listenableFutures中的顺序是一致的
  • Futures.allAsList()方式汇聚,只要多个线程中有一个线程发生异常,就触发最后结果异常,执行onFailure

还有另一种汇聚方式Futures.successfulAsList(),如果多个线程中有一个发生异常,不会触发结果异常,还是执行onSuccess,只不过结果List对应的位置上结果为null,实际应用中根据需要选择合适的方式。

第二种,不同线程执行任务不同,然后汇聚,跟第一种差不多,只不过手动一个个地向装饰后的线程池中提交callable。代码如下:

@Test
public void test33() throws InterruptedException {
    ListenableFuture<Object> strFuture = Futures.immediateFuture((Object) "a");//想象这是帖子
    ListenableFuture<Object> intFuture = Futures.immediateFuture((Object) 1);//点赞
    ListenableFuture<Object> doubleFuture = Futures.immediateFuture((Object) (new Double(33.333)));//...
    ListenableFuture<List<Object>> listListenableFuture = Futures.successfulAsList(strFuture, intFuture, doubleFuture);
    Futures.addCallback(listListenableFuture, new FutureCallback<List<Object>>() {
        @Override
        public void onSuccess(List<Object> result) {
            System.out.println(result.toString());
        }

        @Override
        public void onFailure(Throwable t) {
        }
    }, Executors.newCachedThreadPool());
    Thread.sleep(3000);
}

当不同的任务返回值不同的时候,将他们转换为Object,在最终的结果顺序和Futures.successfulAsList()顺序是一致的,再强转回来。上面执行的结果:

[a, 1, 33.333]

可以看到Guava在实现fan-in模型还是比较方便的。接下来看看RxJava如何实现fan-in呢?

RxJava实现扇入

同样,对RxJava实现扇入分为两种情况,先看下多个线程执行任务相同的情况,还是上面的例子,代码如下:

/**
 * 通过flatMap-toList实现执行路径相同的fan-in
 */
@Test
public void test29__() {
    List<Integer> technicianIds = Lists.newArrayList(1, 2, 3, 4, 5);
    List<String> result = Observable.from(technicianIds)
            .flatMap(new Func1<Integer, Observable<String>>() {
                @Override
                public Observable<String> call(Integer technicianId) {
                    return Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            subscriber.onNext("technicianInfo" + technicianId);
                            subscriber.onCompleted();
                        }
                    }).subscribeOn(Schedulers.io());
                }
            }, 10)//指定线程池中最大线程数
            .toList()//汇聚结果
            .toBlocking()
            .single();

    System.out.println(result);
}

RxJava版的看上去好像更简洁一点。本例中没有设置回调,直接block到了数据,如果线程中发生异常,最外层就会抛出异常,如果设置回调,可以在回调的异常处理中处理。我一般在flatMap内部通过try-catch处理。

对于多个线程执行任务不同的情况,RxJava可以通过zip操作符对多个线程执行的结果进行汇聚,看如下代码:

@Test
public void test18() throws InterruptedException {
    //一个Observable就是一个并行单位,Observable通过onNext发射数据,而一切操作符都是通过Func发射的返回值
    //将数据继续向下传递
    Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            //do anything you want and return a string result
            System.out.println("thread1: " + Thread.currentThread().getName());
            try {
                Thread.sleep(new Random().nextInt(2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1 finished...");
            subscriber.onNext("zhangsan");
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.io());//想象这返回的评论

    //可以看到这里的三个Observable返回值都是不同的
    Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            //do anything you want and return a integer result
            System.out.println("thread2: " + Thread.currentThread().getName());
            try {
                Thread.sleep(new Random().nextInt(2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2 finished...");
            subscriber.onNext(29);
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.io());//想象着返回的是点赞信息

    Observable<Student> observable3 = Observable.create(new Observable.OnSubscribe<Student>() {
        @Override
        public void call(Subscriber<? super Student> subscriber) {
            //do anything you want and return a Student result
            System.out.println("thread3: " + Thread.currentThread().getName());
            try {
                Thread.sleep(new Random().nextInt(2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task3 finished...");
            List<String> corses = Lists.newArrayList("math", "computor");
            Student s = new Student(corses, "aaaa");
            subscriber.onNext(s);
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.io());//...

    //三个Observable并发单元在这里通过zip操作符汇聚,并通过Func将三个并发单元的结果进行处理
    Observable.zip(observable1, observable2, observable3, new Func3<String, Integer, Student, String>() {
        @Override
        public String call(String s, Integer integer, Student student) {
            System.out.println(String.format("args: %s,%s,%s",s,integer,JSON.toJSONString(student)));

            //对前面三个并行处理的结果进行汇聚并处理
            return "name:" + s + " age:" + integer + " student: " + JSON.toJSONString(student);
        }
    }).subscribeOn(Schedulers.io()).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });

    Thread.sleep(5000);
}

Student类就省略了,不影响理解程序逻辑。代码执行结果如下:

thread1: RxCachedThreadScheduler-2

thread2: RxCachedThreadScheduler-3

thread3: RxCachedThreadScheduler-4

task3 finished...

task2 finished...

task1 finished...

args: zhangsan,29,{"corses":["math","computor"],"name":"aaaa"}

name:zhangsan age:29 student: {"corses":["math","computor"],"name":"aaaa"}

RxJava还有一个zipWith操作符,跟zip作用是一样的,只不过一个是静态方法,一个是对象方法。

当然RxJava在数据处理上的操作符还有很多,能够以链式的操作对数据进行处理。在开发中,有的时候用的RxJava,有时候用Guava,不过感觉RxJava更灵活。但是Guava中有很多东西,RxJava并不能代替,反正看情况吧。

Fork-Join

JDK7中提供了fork-join框架,fork-join是解决问题的一种思想:分而治之。

分而治之这种思想有点像Map-Reduce。fork-join和map-reduce不同的地方在于,任务是如何被分成小任务的。

  • fork-join是通过递归的方式,设定一个问题小到足够解决界限,也叫基本条件,当基本条件满足时,就解决这样足够小的问题,当不满足时,就递归的将问题继续变小,直到满足基本条件为止。fork-join是对问题的解决过程中,边解决,边变小,最终解决足够小的问题。最后再自底向上、由小到大不断合并不同规模的问题的解,最后得到原始问题的解。fork的过程就是不断递归把问题变小的过程,join就是不断的由小到大不断合并不同规模问题的解的过程。
  • map-reduce是一开始就将一个大问题分解成n个小问题,然后n个小问题独立的去解决,解决的过程中问题的规模不会改变,不会变小。然后将n个小问题规模的解合并为原始问题的解。map的过程就是开始分解大问题的过程,这个分解就是独立的划分,没有递归的过程,也没有设定一个基本条件,分解完后就开始解决各个小问题,最后通过reduce合并。

map-reduce的各个任务可以在多个线程中解决,也可以在独立的不同机器中解决,这就是Hadoop做的事情。fork-join在不断fork子任务的时候,也是创建线程去解决子任务。

fork-join解决问题的思路如下:

将一个大的任务递归的变成规模更小的任务,多个线程分别去解决这些小的任务,为了减少线程间的任务竞争,每个线程都有自己的任务队列,但是某些线程可能执行的更快,自己的任务队列为空,这时候会“窃取”别的线程的队列中的任务,这个过程叫“任务窃取”(work-steal),为了在窃取时减少竞争,窃取时发生在队列的另一端的,即每个线程的任务队列是一个双向队列。这个过程如下图:

fork-join中的业务类有:ForkJoinTask,表示一个ForkJoin任务,这个类有两个子类

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask :用于有返回结果的任务。

只要继承这两个子任务就可以了。

fork-join实现扇入

从上面的介绍看,fork-join是可以实现扇入的。不过比较适合扇入的第一种情况:所有的执行线程执行的任务相同。因为递归的将问题规模变小,问题的性质并没有变化。 还是类似之前的列子,假如有有一个userId的list,并发的得到List<UserInfo>的结果。

/**
 * 功能描述:  fork-join实现扇入<p>
 *
 * @author : jinpeng.chen <p>
 * @version 1.0 2016-11-17
 */
public class ForkJoinTest2 {

    public static class UserInfo {
        String name;
        int age;

        public UserInfo(String name, int age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return "UserInfo{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }

    public static class GetUserInfoTask extends RecursiveTask<List<UserInfo>> {
        List<Integer> userIds = new ArrayList<>();

        private int start;
        private int end;

        public GetUserInfoTask(int start, int end, List<Integer> userIds) {
            this.start = start;
            this.end = end;
            this.userIds = userIds;
        }

        public GetUserInfoTask(List<Integer> userIds) {
            this.start = 0;
            this.end = userIds.size() - 1;
            this.userIds = userIds;
        }

        @Override
        protected List<UserInfo> compute() {
            System.out.println("thread--->" + Thread.currentThread().getName());
            List<UserInfo> result = new ArrayList<>();
            if (start == end) { 
                Integer userId = userIds.get(start);
                //get user info
                result.add(new UserInfo("user" + userId, new Random().nextInt(30)));
            } else {
                int mid = (start + end) / 2;
                GetUserInfoTask leftTask = new GetUserInfoTask(start, mid,userIds);
                GetUserInfoTask rightTask = new GetUserInfoTask(mid + 1, end, userIds);
                ForkJoinTask<List<UserInfo>> leftFork = leftTask.fork();
                ForkJoinTask<List<UserInfo>> rightFork = rightTask.fork();
                result.addAll(leftFork.join());
                result.addAll(rightFork.join());
            }
            return result;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<Integer> userIds = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        GetUserInfoTask task = new GetUserInfoTask(userIds);
        ForkJoinPool pool = new ForkJoinPool(3);
        ForkJoinTask<List<UserInfo>> forkJoinTask = pool.submit(task);
        System.out.println(forkJoinTask.get());
    }
}

当问题的规模只有一个时,直接解决。否则将问题分为前后两部分,不断的分,直到满足基本条件。然后通过join合并。

results matching ""

    No results matching ""