CLOVER🍀

That was when it all began.

JDK 7のFork/Join Frameworkで遊ぶ

JDK 7のリリース前から気になっていた、Fork/Join Frameworkを触ってみました。かなり今更感があるのは、気にしない方向で…。

Fork/Join Frameworkって?

詳しくは、こちらへ。
http://itpro.nikkeibp.co.jp/article/COLUMN/20110527/360769/?ST=develop&P=1

要は、マルチコアを使い倒すための並列処理フレームワークです。対象にしているのは粒度の小さい(細粒度の)並列処理で、Java 5で導入されたExecutorを使ったフレームワークでは粒度が大きくなってしまうので向いていない…というお話だそうな。

とはいえ、たぶんFork/Join Frameworkは計算処理をメインの用途に想定していると思うので、IOを伴うような粒度の大きな処理はExecutorを使った方がいいのではないかと。

何を使う?

主に使用するのは、以下の2クラスです。

  • java.util.concurrent.ForkJoinPool
  • java.util.concurrent.ForkJoinTask

ForkJoinPoolクラスはスレッドプールを管理するクラスっぽいですが、何気にExecutor、ExecutorServiceインターフェースを実装していたりします。インスタンス生成時に並列度を指定できますが、デフォルトはマシンのプロセッサ数(Runtime#getRuntime()#availableProcessors())です。

主に使用するのは、以下のメソッドです。

  • public void execute(...) … 非同期処理を行い、結果を受け取らない(待ち合わせもしない)
  • public T invoke(...) … 処理が終わるまで待ち合わせを行い、結果を受け取る
  • public ForkJoinTask submit(...) … 非同期処理を行い、処理結果はタスクから受け取る

それぞれ、引数に渡せる型はメソッド毎に違うので、詳しくはAPIを参照のこと。

ForkJoinTaskクラスは、その名の通りタスクを表すクラスで、このサブクラスを作成することで並列処理を行うタスクを作成します。

が、実際にプログラムを書く時には直接ForkJoinTaskクラスを継承することはなさそうで、以下の2クラスを継承することになるようです。

  • java.util.concurrent.RecursiveAction
  • java.util.concurrent.RecursiveTask

この2つの使い分けは、RecursiveActionはタスクを実行した際の戻り値がない(Void)場合に使用し、RecursiveTaskは戻り値がある場合に使用します(よって、型パラメータVがあります)。

サンプル

リストの総和を計算するような、簡単なプログラムを作成してみました。

まずは、メインクラス。
ForkJoinSumExample.java

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class ForkJoinSumExample {
    private static final int LIST_LIMIT = 10000;

    public static void main(String[] args) {
        List<Integer> list = createList();

        ForkJoinPool pool = new ForkJoinPool();
        int result = pool.invoke(new SumTask(list));

        System.out.printf("sum result = %d, simple sum = %d%n", result, simpleSum());
    }

    public static int simpleSum() {
        int sum = 0;
        for (int i = 1; i <= LIST_LIMIT; i++) {
            sum += i;
        }

        return sum;
    }

    public static List<Integer> createList() {
        List<Integer> list = new ArrayList<>();

        for (int i = 1; i <= LIST_LIMIT; i++) {
            list.add(i);
        }

        return list;
    }
}

1〜10000までの要素が入った、リストの総和を計算するようになってます。一応普通に計算する版も付けて。

続いて、タスクの方を。リストの総和を戻すので、RecursiveTaskのサブクラスとして作成します。なお、このクラスを継承する場合はcomputeメソッドをオーバーライドします。
SumTask.java

import java.util.List;
import java.util.concurrent.RecursiveTask;

public class SumTask extends RecursiveTask<Integer> {
    private static final int DEFAULT_PARALLEL_THRESHOLD = 16;

    private List<Integer> list;
    private int threshold;

    public SumTask(List<Integer> list) {
        this(list, DEFAULT_PARALLEL_THRESHOLD);
    }

    public SumTask(List<Integer> list, int threshold) {
        this.list = list;
        this.threshold = threshold;
    }

    @Override
    protected Integer compute() {
        if (list.size() < threshold) {
            int sum = 0;
            for (Integer i : list) {
                sum += i;
            }

            return sum;
        } else {
            int half = list.size() / 2;

            SumTask st1 = new SumTask(list.subList(0, half), threshold);
            st1.fork();

            SumTask st2 = new SumTask(list.subList(half, list.size()), threshold);

            return st2.compute() + st1.join();
        }
    }
}

リストの要素数が16を超えている場合は分割してタスク実行し、そうでなければ自分で計算を行います。注目点は、ここで

            SumTask st1 = new SumTask(list.subList(0, half), threshold);
            st1.fork();

            SumTask st2 = new SumTask(list.subList(half, list.size()), threshold);

            return st2.compute() + st1.join();

新たにタスクを生成して、最初のタスクをforkして並列実行して続いて2つ目のタスクは普通に計算(compute)します。最後に最初のタスクをjoinで待ち合わせて結果を戻すというやり方です。

fork、joinメソッドを使用すると並列処理の開始と待ち合わせですが、computeメソッドの呼び出しって自分が書いた計算処理を呼ぶだけですからね。もちろん、その中が並列実行している可能性は十分にありますが。

けっこう大変なのかなと思っていましたが、サンプル程度だと割と簡単でした。

個人的にはこの分野に興味はあるのですが、仕事で使うかなぁ…?

あ、一応実行結果です。

> run-main ForkJoinSumExample
[info] Running ForkJoinSumExample 
sum result = 50005000, simple sum = 50005000
[success] Total time: 0 s, completed 2012/09/23 19:08:59

後で時間も測ってみましたけど、この程度の計算処理だと並列実行によるオーバーヘッドの方が大きかったです…。