001 // Copyright 2011 The Apache Software Foundation 002 // 003 // Licensed under the Apache License, Version 2.0 (the "License"); 004 // you may not use this file except in compliance with the License. 005 // You may obtain a copy of the License at 006 // 007 // http://www.apache.org/licenses/LICENSE-2.0 008 // 009 // Unless required by applicable law or agreed to in writing, software 010 // distributed under the License is distributed on an "AS IS" BASIS, 011 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 012 // See the License for the specific language governing permissions and 013 // limitations under the License. 014 015 package org.apache.tapestry5.ioc.internal.services.cron; 016 017 import org.apache.tapestry5.ioc.Invokable; 018 import org.apache.tapestry5.ioc.annotations.PostInjection; 019 import org.apache.tapestry5.ioc.internal.util.CollectionFactory; 020 import org.apache.tapestry5.ioc.services.ParallelExecutor; 021 import org.apache.tapestry5.ioc.services.RegistryShutdownHub; 022 import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor; 023 import org.apache.tapestry5.ioc.services.cron.PeriodicJob; 024 import org.apache.tapestry5.ioc.services.cron.Schedule; 025 import org.slf4j.Logger; 026 027 import java.util.List; 028 import java.util.concurrent.atomic.AtomicInteger; 029 030 public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable 031 { 032 private final ParallelExecutor parallelExecutor; 033 034 private final Logger logger; 035 036 // Synchronized by this 037 private final List<Job> jobs = CollectionFactory.newList(); 038 039 private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor"); 040 041 // Synchronized by this. Set when the registry is shutdown. 042 private boolean shutdown; 043 044 private static final long FIVE_MINUTES = 5 * 60 * 1000; 045 046 private final AtomicInteger jobIdAllocator = new AtomicInteger(); 047 048 private class Job implements PeriodicJob, Invokable<Void> 049 { 050 final int jobId = jobIdAllocator.incrementAndGet(); 051 052 private final Schedule schedule; 053 054 private final String name; 055 056 private final Runnable runnableJob; 057 058 private boolean executing, canceled; 059 060 private long nextExecution; 061 062 public Job(Schedule schedule, String name, Runnable runnableJob) 063 { 064 this.schedule = schedule; 065 this.name = name; 066 this.runnableJob = runnableJob; 067 068 nextExecution = schedule.firstExecution(); 069 } 070 071 public String getName() 072 { 073 return name; 074 } 075 076 public synchronized long getNextExecution() 077 { 078 return nextExecution; 079 } 080 081 082 public synchronized boolean isExecuting() 083 { 084 return executing; 085 } 086 087 public synchronized boolean isCanceled() 088 { 089 return canceled; 090 } 091 092 public synchronized void cancel() 093 { 094 canceled = true; 095 096 if (!executing) 097 { 098 removeJob(this); 099 } 100 101 // Otherwise, it will be caught when the job finishes execution. 102 } 103 104 @Override 105 public synchronized String toString() 106 { 107 StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId); 108 109 110 builder.append(", (").append(name).append(")"); 111 112 if (executing) 113 { 114 builder.append(", executing"); 115 } 116 117 if (canceled) 118 { 119 builder.append(", canceled"); 120 } else 121 { 122 builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution)); 123 } 124 125 return builder.append("]").toString(); 126 } 127 128 /** 129 * Starts execution of the job; this sets the executing flag, calculates the next execution time, 130 * and uses the ParallelExecutor to run the job. 131 */ 132 synchronized void start() 133 { 134 executing = true; 135 136 // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options 137 // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing 138 // overlapping executions of the Job on a more rigid schedule. Use Quartz. 139 140 nextExecution = schedule.nextExecution(nextExecution); 141 142 parallelExecutor.invoke(this); 143 144 if (logger.isTraceEnabled()) 145 { 146 logger.trace(this + " sent for execution"); 147 } 148 } 149 150 synchronized void cleanupAfterExecution() 151 { 152 if (logger.isTraceEnabled()) 153 { 154 logger.trace(this + " execution complete"); 155 } 156 157 executing = false; 158 159 if (canceled) 160 { 161 removeJob(this); 162 } else 163 { 164 // Again, naive but necessary. 165 thread.interrupt(); 166 } 167 } 168 169 public Void invoke() 170 { 171 if (logger.isDebugEnabled()) 172 { 173 logger.debug(String.format("Executing job #%d (%s)", jobId, name)); 174 } 175 176 try 177 { 178 runnableJob.run(); 179 } finally 180 { 181 cleanupAfterExecution(); 182 } 183 184 return null; 185 } 186 } 187 188 public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger) 189 { 190 this.parallelExecutor = parallelExecutor; 191 this.logger = logger; 192 } 193 194 @PostInjection 195 public void start(RegistryShutdownHub hub) 196 { 197 hub.addRegistryShutdownListener(new Runnable() 198 { 199 public void run() 200 { 201 registryDidShutdown(); 202 } 203 }); 204 205 thread.start(); 206 } 207 208 209 synchronized void removeJob(Job job) 210 { 211 if (logger.isDebugEnabled()) 212 { 213 logger.debug("Removing " + job); 214 } 215 216 jobs.remove(job); 217 } 218 219 220 public synchronized PeriodicJob addJob(Schedule schedule, String name, Runnable job) 221 { 222 assert schedule != null; 223 assert name != null; 224 assert job != null; 225 226 Job periodicJob = new Job(schedule, name, job); 227 228 jobs.add(periodicJob); 229 230 if (logger.isDebugEnabled()) 231 { 232 logger.debug("Added " + periodicJob); 233 } 234 235 // Wake the thread so that it can start the job, if necessary. 236 237 // Technically, this is only necessary if the new job is scheduled earlier 238 // than any job currently in the list of jobs, but this naive implementation 239 // is simpler. 240 thread.interrupt(); 241 242 return periodicJob; 243 } 244 245 public void run() 246 { 247 while (!isShutdown()) 248 { 249 long nextExecution = executeCurrentBatch(); 250 251 try 252 { 253 long delay = nextExecution - System.currentTimeMillis(); 254 255 if (logger.isTraceEnabled()) 256 { 257 logger.trace(String.format("Sleeping for %,d ms", delay)); 258 } 259 260 if (delay > 0) 261 { 262 Thread.sleep(delay); 263 } 264 } catch (InterruptedException 265 ex) 266 { 267 // Ignored; the thread is interrupted() to shut it down, 268 // or to have it execute a new batch. 269 270 logger.trace("Interrupted"); 271 } 272 } 273 } 274 275 private synchronized boolean isShutdown() 276 { 277 return shutdown; 278 } 279 280 private synchronized void registryDidShutdown() 281 { 282 shutdown = true; 283 284 thread.interrupt(); 285 } 286 287 /** 288 * Finds jobs and executes jobs that are ready to be executed. 289 * 290 * @return the next execution time (from the non-executing job that is scheduled earliest for execution). 291 */ 292 private synchronized long executeCurrentBatch() 293 { 294 long now = System.currentTimeMillis(); 295 long nextExecution = now + FIVE_MINUTES; 296 297 for (Job job : jobs) 298 { 299 if (job.isExecuting()) 300 { 301 continue; 302 } 303 304 long jobNextExecution = job.getNextExecution(); 305 306 if (jobNextExecution <= now) 307 { 308 job.start(); 309 } else 310 { 311 nextExecution = Math.min(nextExecution, jobNextExecution); 312 } 313 } 314 315 return nextExecution; 316 } 317 318 319 }