ããã¯ããªã«ãããããŠæžãããã®ïŒ
ãã¡ãã®ãšã³ããªãŒã®é¢é£ã§ããããŸãã
Quartzのクラスタリングは、大量の小さなジョブを実行する場合はスケールしないという話 - CLOVER🍀
Quartzã¯ã¹ã¬ããããŒã«ãå éšã§ä¿æããŠããŸããã1床ã®èµ·åã§åŠç察象ãšãããžã§ãã®æ°ã¯ããã©ã«ãã ãšå€ããããŸããã
ä»åã¯ããããèŠãŠãããŸãã
確èªããŠããQuartzã®ããŒãžã§ã³ã¯ã2.3.2ã§ãã
batchTriggerAcquisitionMaxCountãšbatchTriggerAcquisitionFireAheadTimeWindow
Quartzã«ã¯org.quartz.scheduler.batchTriggerAcquisitionMaxCount
ãorg.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
ãšãã2ã€ã®ããããã£ããããŸãã
Configure Main Scheduler Settings
ããããã説æãèŠãŠã¿ãŸãã
org.quartz.scheduler.batchTriggerAcquisitionMaxCount
ãã®ããããã£ã¯ãã¹ã±ãžã¥ãŒã©ãŒãããŒãã§1床ã«èµ·åã§ããããªã¬ãŒã®æ°ã§ãããã©ã«ãã§ã¯1ã§ãã1åã§èµ·åããå¿
èŠãããããªã¬ãŒã
å€æ°ããå Žåã¯ããã®å€ã倧ãããããšããå¹ççã«ãžã§ããèµ·åã§ããããã«ãªããŸããã代åãšããŠã¯ã©ã¹ã¿ãŒããŒãéã§è² è·ã
äžåçã«ãªãå¯èœæ§ããããŸãããŸããããŒã¿ã®ç Žæãé¿ããããã«org.quartz.jobStore.acquireTriggersWithinLock
ãtrue
ã«ãã
å¿
èŠããããŸãã
The maximum number of triggers that a scheduler node is allowed to acquire (for firing) at once. Default value is 1. The larger the number, the more efficient firing is (in situations where there are very many triggers needing to be fired all at once) - but at the cost of possible imbalanced load between cluster nodes. If the value of this property is set to > 1, and JDBC JobStore is used, then the property âorg.quartz.jobStore.acquireTriggersWithinLockâ must be set to âtrueâ to avoid data corruption.
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
ãã®ããããã£ã¯ãã¹ã±ãžã¥ãŒã«ãããæéãããæ©ãã«ããªã¬ãŒãååŸããæéãããªç§ã§æå®ããŸãããã®å€ã倧ãããããšã
1床ã«è€æ°ã®ããªã¬ãŒãååŸã§ããå¯èœæ§ãé«ããªããŸãããã¹ã±ãžã¥ãŒã«ãæ£ç¢ºã«ã¯å®ãããªããªããŸããããã¯ãã¹ã±ãžã¥ãŒã©ãŒã
åæã«å€æ°ã®ããªã¬ãŒãèµ·åããå¿
èŠãããå Žåãããã©ãŒãã³ã¹åäžã«åœ¹ç«ã€å¯èœæ§ããããŸãã
The amount of time in milliseconds that a trigger is allowed to be acquired and fired ahead of its scheduled fire time. Defaults to 0. The larger the number, the more likely batch acquisition of triggers to fire will be able to select and fire more than 1 trigger at a time - at the cost of trigger schedule not being honored precisely (triggers may fire this amount early). This may be useful (for performanceâs sake) in situations where the scheduler has very large numbers of triggers that need to be fired at or near the same time.
ãã®å€ãã©ãã«ããã£ãŠããããšãããšããã¡ãã«ãªããŸãã
triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
availThreadCount
ãšããã®ã¯ã¹ã¬ããããŒã«ã§ã®çŸæç¹ã§å©çšå¯èœãªã¹ã¬ããæ°ã§ãã
qsRsrcs.getMaxBatchSize()
ãqsRsrcs.getBatchTimeWindow()
ã¯ããããorg.quartz.scheduler.batchTriggerAcquisitionMaxCount
ããããã£ã
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
ããããã£ã®å€ã«çžåœããŸãã
org.quartz.scheduler.batchTriggerAcquisitionMaxCount
ã®ããã©ã«ãå€ã¯1ã§ãããå©çšå¯èœãªã¹ã¬ããæ°ãšã®Math#min
ãåã£ãŠããã®ã§
ã¹ã¬ããããŒã«ã®ãµã€ãºãã©ãã§ãã1床ã«ååŸããããªã¬ãŒã®æ°ã¯1ã§ããããšãããããŸãã
ããã¯ãã¡ãã§æžãããããªå°ããªãžã§ãã倧éã«èµ·åããããšãå€ããããªãŠãŒã¹ã±ãŒã¹ã§ã¯ãããã¯åŸ
ã¡ã®æéãæ¯é
çã«ãªã
åå ã«ãªããŸãã
Quartzのクラスタリングは、大量の小さなジョブを実行する場合はスケールしないという話 - CLOVER🍀
ãªã®ã§ãããããµã€ãºãã¹ã¬ããããŒã«ã®å€ã«åããããšããããã¯æéãæžãããå¹ççã«ãžã§ããå®è¡ã§ããããã«ãªãããšã
æããŸãã
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException { String lockName; if(isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName, new TransactionCallback<List<OperableTrigger>>() { public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); } },
åŒã³åºãå ã
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { if (timeWindow < 0) { throw new IllegalArgumentException(); } List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>(); Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>(); final int MAX_DO_LOOP_RETRY = 3; int currentLoopCount = 0; do { currentLoopCount ++; try { List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
å®éã«SQLãå®è¡ãããã®ã¯ããã¡ãã
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount) throws SQLException { PreparedStatement ps = null; ResultSet rs = null; List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>(); try { ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE)); // Set max rows to retrieve if (maxCount < 1) maxCount = 1; // we want at least one trigger back. ps.setMaxRows(maxCount); // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need. // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception! ps.setFetchSize(maxCount); ps.setString(1, STATE_WAITING); ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan))); ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan))); rs = ps.executeQuery();
SQLæã¯ããã®ããã«ãªã£ãŠããŸãã
String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT " + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", " + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM " + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) " + "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";
ããã§ãorg.quartz.scheduler.batchTriggerAcquisitionMaxCount
ããããã£ã¯ãã¡ãã§äœ¿ãããŸãã
// Set max rows to retrieve if (maxCount < 1) maxCount = 1; // we want at least one trigger back. ps.setMaxRows(maxCount); // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need. // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception! ps.setFetchSize(maxCount);
ããã¯è¿ãè¡æ°ã«åæ ãããŸãã
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
ããããã£ã¯ãçŸåšæéïŒã¢ã€ãã«ã¿ã€ã ïŒããã©ã«ã30ç§ïŒã«
ããã«å ç®ãããŠä»¥äžã§äœ¿ãããŸãã
COL_NEXT_FIRE_TIME + " <= ? "
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
ããããã£ãæå®ããå Žåã¯ãããåã®æéã®ããªã¬ãŒã察象ã«ããããã
ããåºç¯å²ã«ãžã§ããååŸã§ããããã«ãªããŸãã
ãããŠãååŸããããªã¬ãŒããèµ·åããããžã§ãã¯ã以äžã®éšåã§ã¹ã¬ããããŒã«ã«å²ãåœãŠãããŠå®è¡ãããããšã«ãªããŸãã
for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); ãçç¥ã JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } }
ãã®éšåã§ããã
qsRsrcs.getThreadPool().runInThread(shell)
ããæžããšã以äžã®éšåã§ã¹ã¬ããããŒã«ã®ãµã€ãºã倧ããããŠãããããµã€ãºã倧ããããªãéãæå³ããªãã®ã§ã¯ïŒãšããæ°åã«
ã¡ãã£ãšãªããŸãã
triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
ã§ãããä»åã®è©±ã¯èµ·åããŠããã«å€ãçµãããããªãžã§ãã倧éã«ããå Žåã«æå¹ãªãã¥ãŒãã³ã°ã§ããžã§ããã¹ã±ãžã¥ãŒã©ãŒãããªã¬ãŒã®
ãã§ãã¯ééã§ããã«çµäºããªããããªå Žåã¯availThreadCount
ãã¹ã¬ããããŒã«æ°ãäžåãããšã«ãªãã®ã§ïŒãžã§ããå®è¡ããããŸãŸ
ã ããïŒããããã£ãã±ãŒã¹ã§ã¯æå¹ãªã®ã ãšæããŸãã
ãŸããé·ãæéå®è¡ããããžã§ããããå Žåã«ãã¹ã¬ããããŒã«ãã©ã掻çšããããã確èªãèŠããŸããâŠã
åèïŒ