引言
《java中不太常见的东西》这个模块已经好久没写了,今天写一个java中自带的分布式处理方式Fork/Join。Fork/Join在JDK1.7的时候引入,它某种程度上可以实现简单的map-reduce操作。笔者目前整理的一些blog针对面试都是超高频出现的。大家可以点击链接:http://blog.csdn.net/u012403290。
技术点
1、map-reduce
处理大数据的编程模型,分为”Map(映射)”和”Reduce(归约)”两部分。应用于分布式编程的情况,可以尽可能提升运算效率和速度。通俗来说就是把一个很大的任务,拆分为很多小任务,然后有各自的线程去处理这些小任务,最后把结果统一起来。
2、产生背景
其实Fork/Join处理一定程度的数据,核心建立于目前水平发展的多核计算机技术,它表达了一种充分利用资源的概念。在如今的计算机领域多核处理器早已是主流,而且并发编程讲究多线程处理问题,对计算机资源利用达到一个新的高度。
Fork/Join结构
正确的使用Fork/Join框架,需要一定熟悉它的结构,对于一个分布式的任务,必然具备两种条件:①任务调度;②任务执行。在Fork/Join中,我们主要用它自定义的线程池来提交任务和调度任务,称之为:ForkJoinPool;同时我们有它自己的任务执行类,称之为:ForkJoinTask。
不过我们不直接使用ForkJoinTask来直接执行和分解任务,我们一般都使用它的两个子类,RecursiveAction和RecursiveTask,其中,前者主要处理没有返回结果的任务,后者主要处理有返回结果的任务。总结一下,一下就是Fork/Join的基本模型:
接下来我们一部分一部分来分析一下他们各自的结构:
①ForkJoinPool:
网上很多解释ForkJoinPool的源码已经非常老了,在JDK1.8中已经不再继续维护ForkJoinTask和ForkJoinWorkerThread这两个数组了,前者是一个个任务,后者是执行任务的线程。它现在的模式是形成了一个内部类:WorkQueue,下面是它在JDK1.8中的源码:
/**
* Queues supporting work-stealing as well as external task
* submission. See above for descriptions and algorithms.
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. The @Contended annotation alerts
* JVMs to try to keep instances apart.
*/
@sun.misc.Contended
static final class WorkQueue {
// Instance fields
volatile int scanState; // versioned, <0: inactive; odd:scanning
int stackPred; // pool stack (ctl) predecessor
int nsteals; // number of steals
int hint; // randomization and stealer index hint
int config; // pool index and mode
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
}
仔细阅读源码我们发现,现在的结构和原来完全不一样了。本来我们需要从ForkJoinTask数组中把任务分发给ForkJoinWorkerThread来执行。而现在,用一个内部类workQueue来完成这个任务,在workQueue中存在一个ForkJoinWorkerThread表示这个队列的执行者,同时在workQueue的成员变量中,我们发现有一个ForkJoinTask数组,这个数组是这个Thread需要执行的任务。
阅读这个内部类的描述,我们发现这个queue还支持线程的任务窃取,什么叫线程的任务窃取呢?就是说你和你的一个伙伴一起吃水果,你的那份吃完了,他那份没吃完,那你就偷偷的拿了他的一些水果吃了。存在执行2个任务的子线程,这里要讲成存在A,B两个个WorkQueue在执行任务,A的任务执行完了,B的任务没执行完,那么A的WorkQueue就从B的WorkQueue的ForkJoinTask数组中拿走了一部分尾部的任务来执行,可以合理的提高运行和计算效率。
我们不深入了解源码,这并不是这篇博文的本意。接下来我们看看ForkJoinPool中提交任务的几个方法:
a、submit
/**
* Submits a ForkJoinTask for execution.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
b、execute
/**
* Arranges for (asynchronous) execution of the given task.
*
* @param task the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
c、invoke
/**
* Performs the given task, returning its result upon completion.
* If the computation encounters an unchecked Exception or Error,
* it is rethrown as the outcome of this invocation. Rethrown
* exceptions behave in the same way as regular exceptions, but,
* when possible, contain stack traces (as displayed for example
* using {@code ex.printStackTrace()}) of both the current thread
* as well as the thread actually encountering the exception;
* minimally only the latter.
*
* @param task the task
* @param <T> the type of the task's result
* @return the task's result
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
这3种任务提交方法还是有所差别的,在submit中提交了一个任务之后,会异步开始执行任务同时返回这个任务,而 execute会异步执行这个任务但是没有任何返回。而invoke会异步开始执行任务,直接返回一个结果。
②ForkJoinTask:
在ForkJoinTask中我们就简单介绍fork和join这两种操作,以下是fork方法的源码:
// public methods
/**
* Arranges to asynchronously execute this task in the pool the
* current task is running in, if applicable, or using the {@link
* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
* it is not necessarily enforced, it is a usage error to fork a
* task more than once unless it has completed and been
* reinitialized. Subsequent modifications to the state of this
* task or any data it operates on are not necessarily
* consistently observable by any thread other than the one
* executing it unless preceded by a call to {@link #join} or
* related methods, or a call to {@link #isDone} returning {@code
* true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);//把当前线程添加到workQueue中
else
ForkJoinPool.common.externalPush(this);//直接执行这个任务
return this;
}
在fork方法中,它会先判断当前的线程是否属于ForkJoinWorkerThread线程,如果属于这个线程,那么就把线程添加到workQueue中,否则就直接执行这个任务。
以下是join方法:
/**
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
*
* @return the computed result
*/
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)//判断任务是否正常,否则要报告异常
reportException(s);
return getRawResult();//返回结果
}
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);//如果任务执行完了,那么就设置为NORMAL
}
return s;
}
在join的操作主要是判断当前任务的执行状态和返回结果,任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。
在doJoin()方法里,首先通过查看任务的状态,通过doExec方法去判断任务是否执行完毕,如果执行完了,则直接返回任务状态,如果没有执行完,就等待继续执行。如果任务顺利执行完成了,则设置任务状态为NORMAL,如果出现异常,则需要报告异常。
用代码实现Fork/Join实现大数据计算
如果真的要很详细的去介绍Fork/join源码,貌似需要更进一步的去钻研,很多底层的的东西还涉及到了一些乐观锁。我们不继续深究了,我们尝试用fork/join来实现大数列的计算,同时我们尝试把它和一般的计算方式做比较,看看哪个效率更高。
需求:
计算1+2+3+……..+N的和
以下是我实现的用Fork/Join进行计算,主要的核心思想就是把超大的计算拆分为小的计算,通俗来说就是把一个极大的任务拆分为很多个小任务,下面是核心计算模型:
下面是代码实现:
package com.brickworkers;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class FockJoinTest extends RecursiveTask<Long>{//继承RecursiveTask来实现
//设立一个最大计算容量
private final int DEFAULT_CAPACITY = 10000;
//用2个数字表示目前要计算的范围
private int start;
private int end;
public FockJoinTest(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {//实现compute方法
//分为两种情况进行出来
long sum = 0;
//如果任务量在最大容量之内
if(end - start < DEFAULT_CAPACITY){
for (int i = start; i < end; i++) {
sum += i;
}
}else{//如果超过了最大容量,那么就进行拆分处理
//计算容量中间值
int middle = (start + end)/2;
//进行递归
FockJoinTest fockJoinTest1 = new FockJoinTest(start, middle);
FockJoinTest fockJoinTest2 = new FockJoinTest(middle + 1, end);
//执行任务
fockJoinTest1.fork();
fockJoinTest2.fork();
//等待任务执行并返回结果
sum = fockJoinTest1.join() + fockJoinTest2.join();
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
FockJoinTest fockJoinTest = new FockJoinTest(1, 100000000);
long fockhoinStartTime = System.currentTimeMillis();
//前面我们说过,任务提交中invoke可以直接返回结果
long result = forkJoinPool.invoke(fockJoinTest);
System.out.println("fock/join计算结果耗时"+(System.currentTimeMillis() - fockhoinStartTime));
long sum = 0;
long normalStartTime = System.currentTimeMillis();
for (int i = 0; i < 100000000; i++) {
sum += i;
}
System.out.println("普通计算结果耗时"+(System.currentTimeMillis() - normalStartTime));
}
}
//执行结果:
//fock/join计算结果耗时33
//普通计算结果耗时141
注意,在上面的例子中,程序的效率其实首你设置的DEFAULT_CAPACITY影响的,如果你把这个容量值设置的太小,那么它会被分解成好多好多的子任务,那么效率反而会降低。但是把容量设置的稍微大一些效率也会相对的提升,经过测试,运行时间和DEFAULT_CAPCITY的关系大致如下图:
尾记
在我们的日常开发中,很多地方可以用分布式的方式去实现它,当然了这个是要建立你在资源很富余的情况之下。比如说,定时任务,半夜执行的时候,资源富足,那么我们可以用这种方式加快运算效率。再比如说,项目报表文件的导出,我们可以把超级多行的数据一部分一部分拆开出来,也可以达到加快效率的效果。大家可以尝试。
希望对你有所帮助。