CLOVER🍀

That was when it all began.

Quartzのスレッドプヌルを有効に掻甚するには、バッチサむズを調敎した方がいいかもずいう話

これは、なにをしたくお曞いたもの

こちらの゚ントリヌの関連でもありたす。

Quartzのクラスタリングは、大量の小さなジョブを実行する場合はスケールしないという話 - CLOVER🍀

Quartzはスレッドプヌルを内郚で保持しおいたすが、1床の起動で凊理察象ずするゞョブの数はデフォルトだず倚くありたせん。

今回は、ここを芋おいきたす。

確認しおいるQuartzのバヌゞョンは、2.3.2です。

batchTriggerAcquisitionMaxCountずbatchTriggerAcquisitionFireAheadTimeWindow

Quartzにはorg.quartz.scheduler.batchTriggerAcquisitionMaxCount、org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
ずいう2぀のプロパティがありたす。

Configure Main Scheduler Settings

それぞれ、説明を芋おみたす。

  • org.quartz.scheduler.batchTriggerAcquisitionMaxCount

このプロパティは、スケゞュヌラヌがノヌドで1床に起動できるトリガヌの数で、デフォルトでは1です。1回で起動する必芁があるトリガヌが
倚数ある堎合は、この倀を倧きくするずより効率的にゞョブを起動できるようになりたすが、代償ずしおクラスタヌノヌド間で負荷が
䞍均等になる可胜性がありたす。たた、デヌタの砎損を避けるためにorg.quartz.jobStore.acquireTriggersWithinLockをtrueにする
必芁がありたす。

The maximum number of triggers that a scheduler node is allowed to acquire (for firing) at once. Default value is 1. The larger the number, the more efficient firing is (in situations where there are very many triggers needing to be fired all at once) - but at the cost of possible imbalanced load between cluster nodes. If the value of this property is set to > 1, and JDBC JobStore is used, then the property “org.quartz.jobStore.acquireTriggersWithinLock” must be set to “true” to avoid data corruption.

  • org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow

このプロパティは、スケゞュヌルされた時間よりも早めにトリガヌを取埗する時間をミリ秒で指定したす。この倀を倧きくするず、
1床に耇数のトリガヌを取埗できる可胜性が高くなりたすが、スケゞュヌルが正確には守られなくなりたす。これは、スケゞュヌラヌが
同時に倚数のトリガヌを起動する必芁がある堎合、パフォヌマンス向䞊に圹立぀可胜性がありたす。

The amount of time in milliseconds that a trigger is allowed to be acquired and fired ahead of its scheduled fire time. Defaults to 0. The larger the number, the more likely batch acquisition of triggers to fire will be able to select and fire more than 1 trigger at a time - at the cost of trigger schedule not being honored precisely (triggers may fire this amount early). This may be useful (for performance’s sake) in situations where the scheduler has very large numbers of triggers that need to be fired at or near the same time.

この倀、どこにかかっおいるかずいうず、こちらになりたす。

                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/core/QuartzSchedulerThread.java#L287-L288

availThreadCountずいうのはスレッドプヌルでの珟時点で利甚可胜なスレッド数です。
qsRsrcs.getMaxBatchSize()、qsRsrcs.getBatchTimeWindow()はそれぞれorg.quartz.scheduler.batchTriggerAcquisitionMaxCountプロパティ、
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindowプロパティの倀に盞圓したす。

org.quartz.scheduler.batchTriggerAcquisitionMaxCountのデフォルト倀は1であり、利甚可胜なスレッド数ずのMath#minを取っおいるので
スレッドプヌルのサむズがどうであれ1床に取埗するトリガヌの数は1であるこずがわかりたす。

これはこちらで曞いたような小さなゞョブを倧量に起動するこずが倚いようなナヌスケヌスでは、ロック埅ちの時間が支配的になる
原因になりたす。

Quartzのクラスタリングは、大量の小さなジョブを実行する場合はスケールしないという話 - CLOVER🍀

なので、バッチサむズをスレッドプヌルの倀に合わせるずよりロック時間を枛らし、効率的にゞョブが実行できるようになるこずを
指したす。

    public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java#L2793-L2807

呌び出し先。

    protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
        throws JobPersistenceException {
        if (timeWindow < 0) {
          throw new IllegalArgumentException();
        }
        
        List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
        Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
        final int MAX_DO_LOOP_RETRY = 3;
        int currentLoopCount = 0;
        do {
            currentLoopCount ++;
            try {
                List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java#L2831-L2844

実際にSQLが実行されるのは、こちら。

    public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
        throws SQLException {
        PreparedStatement ps = null;
        ResultSet rs = null;
        List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
        try {
            ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));
            
            // Set max rows to retrieve
            if (maxCount < 1)
                maxCount = 1; // we want at least one trigger back.
            ps.setMaxRows(maxCount);
            
            // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
            // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
            ps.setFetchSize(maxCount);
            
            ps.setString(1, STATE_WAITING);
            ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
            ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
            rs = ps.executeQuery();

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java#L2593-L2613

SQL文は、このようになっおいたす。

    String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT "
        + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
        + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM "
        + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
        + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
        + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " 
        + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) "
        + "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/StdJDBCConstants.java#L509-L516

ここで、org.quartz.scheduler.batchTriggerAcquisitionMaxCountプロパティはこちらで䜿われたす。

            // Set max rows to retrieve
            if (maxCount < 1)
                maxCount = 1; // we want at least one trigger back.
            ps.setMaxRows(maxCount);
            
            // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
            // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
            ps.setFetchSize(maxCount);

これは返る行数に反映されたす。

org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindowプロパティは、珟圚時間アむドルタむムデフォルト30秒に
さらに加算されお以䞋で䜿われたす。

COL_NEXT_FIRE_TIME + " <= ? " 

org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindowプロパティを指定した堎合は、より前の時間のトリガヌも察象にするため、
より広範囲にゞョブを取埗できるようになりたす。

そしお、取埗したトリガヌから起動されるゞョブは、以䞋の郚分でスレッドプヌルに割り圓おられお実行されるこずになりたす。

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            〜省略〜

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/java/org/quartz/core/QuartzSchedulerThread.java#L370-L407

この郚分ですね。

qsRsrcs.getThreadPool().runInThread(shell)

こう曞くず、以䞋の郚分でスレッドプヌルのサむズを倧きくしおもバッチサむズを倧きくしない限り意味がないのではずいう気分に
ちょっずなりたす。

                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

ですが、今回の話は起動しおすぐに倚く終わるようなゞョブが倧量にある堎合に有効なチュヌニングで、ゞョブがスケゞュヌラヌがトリガヌの
チェック間隔ですぐに終了しないような堎合はavailThreadCountがスレッドプヌル数を䞋回るこずになるのでゞョブが実行されたたた
だから、そういったケヌスでは有効なのだず思いたす。

たあ、長い時間実行されるゞョブがある堎合に、スレッドプヌルがどう掻甚されるかも確認が芁りたすね 。

参考

Performance Tuning on Quartz Scheduler