简单实例详解多线编程的Fork/Join应用( 二 )


 
上面的实现在单CPU机器上运行良好 。但是如果我们有多个可用的CPU,我们可能希望将这些工作分配给可用的核心 。
因此,不需要在两个嵌套的for循环中遍历所有像素,我们可以用ForkJoinPool并为图像的每一行(或每一列)提交一个新任务 。
一旦将一行转换为灰度,当前线程就可以处理下一行 。
这个原则在下面的例子中实现:

简单实例详解多线编程的Fork/Join应用

文章插图
 
在main()方法中,我们使用Java的ImageIO类读取图像 。返回的BufferedImage实例具有我们需要的所有方法 。
我们可以查询行数和列数,并检索和设置每个像素的RGB值 。所以我们要做的就是遍历所有行,并向我们的ForkJoinPool提交一个新的GrayscaleImageAction 。后者收到了关于可用处理器的提示,作为其构造函数的参数 。
现在,通过调用它们的compute()方法,ForkJoinPool可以异步启动任务 。在这个方法中,我们遍历每一行并根据其灰度值更新相应的RGB值 。
将所有任务提交到池后,我们在主线程中等待关闭整个池,然后使用ImageIO.write()方法将更新后的BufferedImage写回磁盘 。
令人惊讶的是,如果不使用可用的处理器,我们只需要几行代码 。这里我们再次通过使用java.util.concurrent包来完成实现 。
简单实例详解多线编程的Fork/Join应用

文章插图
 
ForkJoinPool为提交任务提供了三种不同的方法:
  • execute(ForkJoinTask):此方法异步执行给定的任务 。它没有返回值 。
  • invoke(ForkJoinTask): 此方法等待任务返回值 。
  • submit(ForkJoinTask): 此方法异步执行给定的任务 。它返回对任务本身的引用 。因此,可以使用任务引用来查询结果(因为它实现了Future接口) 。
有了这些知识,我们就很清楚为什么要使用execute()方法提交上面的GrayscaleImageAction 。
如果我们使用invoke(),主线程就会等待任务完成,我们就不会利用可用的并行度 。
当我们仔细研究ForkJoinTask-API时,我们发现了同样的区别:
  • ForkJoinTask.fork(): ForkJoinTask是异步执行的 。它没有返回值 。
  • ForkJoinTask.invoke(): 立即执行ForkJoinTask,并在完成后返回结果 。
ForkJoinPool 和 ExecutorService既然我们已经知道了ExecutorService和ForkJoinPool,您可能会问自己为什么应该使用ForkJoinPool而不是ExecutorService 。
两者之间的差别并不大 。两者都有execute()和submit()方法,并使用一些公共接口的实例,如Runnable、Callable、RecursiveAction或RecursiveTask 。
为了更好地理解这种差异,让我们尝试使用ExecutorService从上面实现GetMinNumb类:
简单实例详解多线编程的Fork/Join应用

文章插图
 
代码看起来非常相似,除了我们将任务提交给ExecutorService,然后使用返回的Future实例来等待结果之外 。
这两个实现之间的主要区别可以在线程池构建的地方找到 。
在上面的例子中,我们创建了一个包含64个线程的固定线程池 。
为什么选择这么大的数字? 这里的原因是,为每个返回的Future调用get()会阻塞当前线程,直到结果可用为止 。
如果我们只向池提供可用cpu数量的线程,那么程序将耗尽资源并无限期挂起 。
ForkJoinPool实现了前面提到的工作窃取策略,即每次运行的线程都必须等待某个结果;
该线程从工作队列中删除当前任务,并执行一些准备运行的其他任务 。
这样,当前线程就不会被阻塞,可以用来执行其他任务 。一旦计算了最初挂起的任务的结果,任务将再次执行,join()方法将返回结果 。
这是与常规ExecutorService的一个重要区别,在常规ExecutorService中,我们必须在等待结果时阻塞当前线程 。

【简单实例详解多线编程的Fork/Join应用】


推荐阅读