001// Licensed under the Apache License, Version 2.0 (the "License"); 002// you may not use this file except in compliance with the License. 003// You may obtain a copy of the License at 004// 005// http://www.apache.org/licenses/LICENSE-2.0 006// 007// Unless required by applicable law or agreed to in writing, software 008// distributed under the License is distributed on an "AS IS" BASIS, 009// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 010// See the License for the specific language governing permissions and 011// limitations under the License. 012 013package org.apache.tapestry5.ioc.internal.services.cron; 014 015import org.apache.tapestry5.ioc.Invokable; 016import org.apache.tapestry5.ioc.annotations.PostInjection; 017import org.apache.tapestry5.ioc.internal.util.CollectionFactory; 018import org.apache.tapestry5.ioc.services.ParallelExecutor; 019import org.apache.tapestry5.ioc.services.RegistryShutdownHub; 020import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor; 021import org.apache.tapestry5.ioc.services.cron.PeriodicJob; 022import org.apache.tapestry5.ioc.services.cron.Schedule; 023import org.slf4j.Logger; 024 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Set; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.locks.Lock; 031import java.util.concurrent.locks.ReentrantLock; 032 033public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable 034{ 035 private final ParallelExecutor parallelExecutor; 036 037 private final Logger logger; 038 039 // Synchronized by jobLock 040 private final List<Job> jobs = CollectionFactory.newList(); 041 042 private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor"); 043 044 private transient boolean shutdown; 045 046 private static final long FIVE_MINUTES = 5 * 60 * 1000; 047 048 private final AtomicInteger jobIdAllocator = new AtomicInteger(); 049 050 private final Lock jobLock = new ReentrantLock(); 051 052 private class Job implements PeriodicJob, Invokable<Void> 053 { 054 final int jobId = jobIdAllocator.incrementAndGet(); 055 056 private final Schedule schedule; 057 058 private final String name; 059 060 private final Runnable runnableJob; 061 062 private boolean executing, canceled; 063 064 private long nextExecution; 065 066 public Job(Schedule schedule, String name, Runnable runnableJob) 067 { 068 this.schedule = schedule; 069 this.name = name; 070 this.runnableJob = runnableJob; 071 072 nextExecution = schedule.firstExecution(); 073 } 074 075 @Override 076 public String getName() 077 { 078 return name; 079 } 080 081 public long getNextExecution() 082 { 083 try 084 { 085 jobLock.lock(); 086 return nextExecution; 087 } finally 088 { 089 jobLock.unlock(); 090 } 091 } 092 093 094 @Override 095 public boolean isExecuting() 096 { 097 try 098 { 099 jobLock.lock(); 100 return executing; 101 } finally 102 { 103 jobLock.unlock(); 104 } 105 } 106 107 @Override 108 public boolean isCanceled() 109 { 110 try 111 { 112 jobLock.lock(); 113 return canceled; 114 } finally 115 { 116 jobLock.unlock(); 117 } 118 } 119 120 @Override 121 public void cancel() 122 { 123 try 124 { 125 jobLock.lock(); 126 127 canceled = true; 128 129 if (!executing) 130 { 131 removeJob(this); 132 } 133 134 // Otherwise, it will be caught when the job finishes execution. 135 } finally 136 { 137 jobLock.unlock(); 138 } 139 } 140 141 @Override 142 public String toString() 143 { 144 StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId); 145 146 builder.append(", (").append(name).append(')'); 147 148 if (executing) 149 { 150 builder.append(", executing"); 151 } 152 153 if (canceled) 154 { 155 builder.append(", canceled"); 156 } else 157 { 158 builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution)); 159 } 160 161 return builder.append(']').toString(); 162 } 163 164 /** 165 * Starts execution of the job; this sets the executing flag, calculates the next execution time, 166 * and uses the ParallelExecutor to run the job. 167 */ 168 void start() 169 { 170 try 171 { 172 jobLock.lock(); 173 executing = true; 174 175 // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options 176 // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing 177 // overlapping executions of the Job on a more rigid schedule. Use Quartz. 178 179 nextExecution = schedule.nextExecution(nextExecution); 180 181 parallelExecutor.invoke(this); 182 } finally 183 { 184 jobLock.unlock(); 185 } 186 187 if (logger.isTraceEnabled()) 188 { 189 logger.trace(this + " sent for execution"); 190 } 191 } 192 193 void cleanupAfterExecution() 194 { 195 try 196 { 197 if (logger.isTraceEnabled()) 198 { 199 logger.trace(this + " execution complete"); 200 } 201 202 executing = false; 203 204 if (canceled) 205 { 206 removeJob(this); 207 } else 208 { 209 // Again, naive but necessary. 210 thread.interrupt(); 211 } 212 } finally 213 { 214 jobLock.unlock(); 215 } 216 } 217 218 @Override 219 public Void invoke() 220 { 221 if (logger.isDebugEnabled()) 222 { 223 logger.debug(String.format("Executing job #%d (%s)", jobId, name)); 224 } 225 226 try 227 { 228 runnableJob.run(); 229 } finally 230 { 231 cleanupAfterExecution(); 232 } 233 234 return null; 235 } 236 237 } 238 239 public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger) 240 { 241 this.parallelExecutor = parallelExecutor; 242 this.logger = logger; 243 } 244 245 @PostInjection 246 public void start(RegistryShutdownHub hub) 247 { 248 hub.addRegistryShutdownListener(new Runnable() 249 { 250 @Override 251 public void run() 252 { 253 registryDidShutdown(); 254 } 255 }); 256 257 thread.start(); 258 } 259 260 261 void removeJob(Job job) 262 { 263 if (logger.isDebugEnabled()) 264 { 265 logger.debug("Removing " + job); 266 } 267 268 try 269 { 270 jobLock.lock(); 271 jobs.remove(job); 272 } finally 273 { 274 jobLock.unlock(); 275 } 276 } 277 278 279 @Override 280 public PeriodicJob addJob(Schedule schedule, String name, Runnable job) 281 { 282 assert schedule != null; 283 assert name != null; 284 assert job != null; 285 286 Job periodicJob = new Job(schedule, name, job); 287 288 try 289 { 290 jobLock.lock(); 291 292 jobs.add(periodicJob); 293 } finally 294 { 295 jobLock.unlock(); 296 } 297 298 if (logger.isDebugEnabled()) 299 { 300 logger.debug("Added " + periodicJob); 301 } 302 303 // Wake the thread so that it can start the job, if necessary. 304 305 // Technically, this is only necessary if the new job is scheduled earlier 306 // than any job currently in the list of jobs, but this naive implementation 307 // is simpler. 308 thread.interrupt(); 309 310 return periodicJob; 311 } 312 313 @Override 314 public void run() 315 { 316 while (!shutdown) 317 { 318 long nextExecution = executeCurrentBatch(); 319 320 try 321 { 322 long delay = nextExecution - System.currentTimeMillis(); 323 324 if (logger.isTraceEnabled()) 325 { 326 logger.trace(String.format("Sleeping for %,d ms", delay)); 327 } 328 329 if (delay > 0) 330 { 331 Thread.sleep(delay); 332 } 333 } catch (InterruptedException 334 ex) 335 { 336 // Ignored; the thread is interrupted() to shut it down, 337 // or to have it execute a new batch. 338 339 logger.trace("Interrupted"); 340 } 341 } 342 } 343 344 private void registryDidShutdown() 345 { 346 shutdown = true; 347 348 thread.interrupt(); 349 } 350 351 /** 352 * Finds jobs and executes jobs that are ready to be executed. 353 * 354 * @return the next execution time (from the non-executing job that is scheduled earliest for execution). 355 */ 356 private long executeCurrentBatch() 357 { 358 long now = System.currentTimeMillis(); 359 long nextExecution = now + FIVE_MINUTES; 360 361 try 362 { 363 jobLock.lock(); 364 // TAP5-2455 365 Set<Job> jobsToCancel = null; 366 367 for (Job job : jobs) 368 { 369 if (job.isExecuting()) 370 { 371 continue; 372 } 373 374 long jobNextExecution = job.getNextExecution(); 375 376 if (jobNextExecution <= 0) 377 { 378 if (jobsToCancel == null) 379 { 380 jobsToCancel = new HashSet<PeriodicExecutorImpl.Job>(); 381 } 382 jobsToCancel.add(job); 383 } else if (jobNextExecution <= now) 384 { 385 job.start(); 386 } else 387 { 388 nextExecution = Math.min(nextExecution, jobNextExecution); 389 } 390 } 391 if (jobsToCancel != null) 392 { 393 for (Job job : jobsToCancel) 394 { 395 job.cancel(); 396 } 397 } 398 } finally 399 { 400 jobLock.unlock(); 401 } 402 403 return nextExecution; 404 } 405 406 407}