CLOVER🍀

That was when it all began.

Quartzのスレッドプールを有効に活用するには、バッチサイズを調整した方がいいかもという話

これは、なにをしたくて書いたもの?

こちらのエントリーの関連でもあります。

Quartzのクラスタリングは、大量の小さなジョブを実行する場合はスケールしないという話 - CLOVER🍀

Quartzはスレッドプールを内部で保持していますが、1度の起動で処理対象とするジョブの数はデフォルトだと多くありません。

今回は、ここを見ていきます。

確認しているQuartzのバージョンは、2.3.2です。

batchTriggerAcquisitionMaxCountとbatchTriggerAcquisitionFireAheadTimeWindow

Quartzにはorg.quartz.scheduler.batchTriggerAcquisitionMaxCount、org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
という2つのプロパティがあります。

Configure Main Scheduler Settings

それぞれ、説明を見てみます。

  • org.quartz.scheduler.batchTriggerAcquisitionMaxCount

このプロパティは、スケジューラーがノードで1度に起動できるトリガーの数で、デフォルトでは1です。1回で起動する必要があるトリガーが
多数ある場合は、この値を大きくするとより効率的にジョブを起動できるようになりますが、代償としてクラスターノード間で負荷が
不均等になる可能性があります。また、データの破損を避けるためにorg.quartz.jobStore.acquireTriggersWithinLockをtrueにする
必要があります。

The maximum number of triggers that a scheduler node is allowed to acquire (for firing) at once. Default value is 1. The larger the number, the more efficient firing is (in situations where there are very many triggers needing to be fired all at once) - but at the cost of possible imbalanced load between cluster nodes. If the value of this property is set to > 1, and JDBC JobStore is used, then the property “org.quartz.jobStore.acquireTriggersWithinLock” must be set to “true” to avoid data corruption.

  • org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow

このプロパティは、スケジュールされた時間よりも早めにトリガーを取得する時間をミリ秒で指定します。この値を大きくすると、
1度に複数のトリガーを取得できる可能性が高くなりますが、スケジュールが正確には守られなくなります。これは、スケジューラーが
同時に多数のトリガーを起動する必要がある場合、パフォーマンス向上に役立つ可能性があります。

The amount of time in milliseconds that a trigger is allowed to be acquired and fired ahead of its scheduled fire time. Defaults to 0. The larger the number, the more likely batch acquisition of triggers to fire will be able to select and fire more than 1 trigger at a time - at the cost of trigger schedule not being honored precisely (triggers may fire this amount early). This may be useful (for performance’s sake) in situations where the scheduler has very large numbers of triggers that need to be fired at or near the same time.

この値、どこにかかっているかというと、こちらになります。

                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/core/QuartzSchedulerThread.java#L287-L288

availThreadCountというのはスレッドプールでの現時点で利用可能なスレッド数です。
qsRsrcs.getMaxBatchSize()、qsRsrcs.getBatchTimeWindow()はそれぞれorg.quartz.scheduler.batchTriggerAcquisitionMaxCountプロパティ、
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindowプロパティの値に相当します。

org.quartz.scheduler.batchTriggerAcquisitionMaxCountのデフォルト値は1であり、利用可能なスレッド数とのMath#minを取っているので
スレッドプールのサイズがどうであれ1度に取得するトリガーの数は1であることがわかります。

これはこちらで書いたような小さなジョブを大量に起動することが多いようなユースケースでは、ロック待ちの時間が支配的になる
原因になります。

Quartzのクラスタリングは、大量の小さなジョブを実行する場合はスケールしないという話 - CLOVER🍀

なので、バッチサイズをスレッドプールの値に合わせるとよりロック時間を減らし、効率的にジョブが実行できるようになることを
指します。

    public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java#L2793-L2807

呼び出し先。

    protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
        throws JobPersistenceException {
        if (timeWindow < 0) {
          throw new IllegalArgumentException();
        }
        
        List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
        Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
        final int MAX_DO_LOOP_RETRY = 3;
        int currentLoopCount = 0;
        do {
            currentLoopCount ++;
            try {
                List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java#L2831-L2844

実際にSQLが実行されるのは、こちら。

    public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
        throws SQLException {
        PreparedStatement ps = null;
        ResultSet rs = null;
        List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
        try {
            ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));
            
            // Set max rows to retrieve
            if (maxCount < 1)
                maxCount = 1; // we want at least one trigger back.
            ps.setMaxRows(maxCount);
            
            // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
            // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
            ps.setFetchSize(maxCount);
            
            ps.setString(1, STATE_WAITING);
            ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
            ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
            rs = ps.executeQuery();

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java#L2593-L2613

SQL文は、このようになっています。

    String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT "
        + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
        + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM "
        + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
        + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
        + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " 
        + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) "
        + "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java#L509-L516

ここで、org.quartz.scheduler.batchTriggerAcquisitionMaxCountプロパティはこちらで使われます。

            // Set max rows to retrieve
            if (maxCount < 1)
                maxCount = 1; // we want at least one trigger back.
            ps.setMaxRows(maxCount);
            
            // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
            // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
            ps.setFetchSize(maxCount);

これは返る行数に反映されます。

org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindowプロパティは、現在時間+アイドルタイム(デフォルト30秒)に
さらに加算されて以下で使われます。

COL_NEXT_FIRE_TIME + " <= ? " 

org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindowプロパティを指定した場合は、より前の時間のトリガーも対象にするため、
より広範囲にジョブを取得できるようになります。

そして、取得したトリガーから起動されるジョブは、以下の部分でスレッドプールに割り当てられて実行されることになります。

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            〜省略〜

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/core/QuartzSchedulerThread.java#L370-L407

この部分ですね。

qsRsrcs.getThreadPool().runInThread(shell)

こう書くと、以下の部分でスレッドプールのサイズを大きくしてもバッチサイズを大きくしない限り意味がないのでは?という気分に
ちょっとなります。

                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

ですが、今回の話は起動してすぐに多く終わるようなジョブが大量にある場合に有効なチューニングで、ジョブがスケジューラーがトリガーの
チェック間隔ですぐに終了しないような場合はavailThreadCountがスレッドプール数を下回ることになるので(ジョブが実行されたまま
だから)、そういったケースでは有効なのだと思います。

まあ、長い時間実行されるジョブがある場合に、スレッドプールがどう活用されるかも確認が要りますね…。

参考)

Performance Tuning on Quartz Scheduler