001 // Copyright 2011 The Apache Software Foundation
002 //
003 // Licensed under the Apache License, Version 2.0 (the "License");
004 // you may not use this file except in compliance with the License.
005 // You may obtain a copy of the License at
006 //
007 // http://www.apache.org/licenses/LICENSE-2.0
008 //
009 // Unless required by applicable law or agreed to in writing, software
010 // distributed under the License is distributed on an "AS IS" BASIS,
011 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
012 // See the License for the specific language governing permissions and
013 // limitations under the License.
014
015 package org.apache.tapestry5.ioc.internal.services.cron;
016
017 import org.apache.tapestry5.ioc.Invokable;
018 import org.apache.tapestry5.ioc.annotations.PostInjection;
019 import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
020 import org.apache.tapestry5.ioc.services.ParallelExecutor;
021 import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
022 import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
023 import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
024 import org.apache.tapestry5.ioc.services.cron.Schedule;
025 import org.slf4j.Logger;
026
027 import java.util.List;
028 import java.util.concurrent.atomic.AtomicInteger;
029
030 public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
031 {
032 private final ParallelExecutor parallelExecutor;
033
034 private final Logger logger;
035
036 // Synchronized by this
037 private final List<Job> jobs = CollectionFactory.newList();
038
039 private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
040
041 // Synchronized by this. Set when the registry is shutdown.
042 private boolean shutdown;
043
044 private static final long FIVE_MINUTES = 5 * 60 * 1000;
045
046 private final AtomicInteger jobIdAllocator = new AtomicInteger();
047
048 private class Job implements PeriodicJob, Invokable<Void>
049 {
050 final int jobId = jobIdAllocator.incrementAndGet();
051
052 private final Schedule schedule;
053
054 private final String name;
055
056 private final Runnable runnableJob;
057
058 private boolean executing, canceled;
059
060 private long nextExecution;
061
062 public Job(Schedule schedule, String name, Runnable runnableJob)
063 {
064 this.schedule = schedule;
065 this.name = name;
066 this.runnableJob = runnableJob;
067
068 nextExecution = schedule.firstExecution();
069 }
070
071 public String getName()
072 {
073 return name;
074 }
075
076 public synchronized long getNextExecution()
077 {
078 return nextExecution;
079 }
080
081
082 public synchronized boolean isExecuting()
083 {
084 return executing;
085 }
086
087 public synchronized boolean isCanceled()
088 {
089 return canceled;
090 }
091
092 public synchronized void cancel()
093 {
094 canceled = true;
095
096 if (!executing)
097 {
098 removeJob(this);
099 }
100
101 // Otherwise, it will be caught when the job finishes execution.
102 }
103
104 @Override
105 public synchronized String toString()
106 {
107 StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
108
109
110 builder.append(", (").append(name).append(")");
111
112 if (executing)
113 {
114 builder.append(", executing");
115 }
116
117 if (canceled)
118 {
119 builder.append(", canceled");
120 } else
121 {
122 builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
123 }
124
125 return builder.append("]").toString();
126 }
127
128 /**
129 * Starts execution of the job; this sets the executing flag, calculates the next execution time,
130 * and uses the ParallelExecutor to run the job.
131 */
132 synchronized void start()
133 {
134 executing = true;
135
136 // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
137 // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
138 // overlapping executions of the Job on a more rigid schedule. Use Quartz.
139
140 nextExecution = schedule.nextExecution(nextExecution);
141
142 parallelExecutor.invoke(this);
143
144 if (logger.isTraceEnabled())
145 {
146 logger.trace(this + " sent for execution");
147 }
148 }
149
150 synchronized void cleanupAfterExecution()
151 {
152 if (logger.isTraceEnabled())
153 {
154 logger.trace(this + " execution complete");
155 }
156
157 executing = false;
158
159 if (canceled)
160 {
161 removeJob(this);
162 } else
163 {
164 // Again, naive but necessary.
165 thread.interrupt();
166 }
167 }
168
169 public Void invoke()
170 {
171 if (logger.isDebugEnabled())
172 {
173 logger.debug(String.format("Executing job #%d (%s)", jobId, name));
174 }
175
176 try
177 {
178 runnableJob.run();
179 } finally
180 {
181 cleanupAfterExecution();
182 }
183
184 return null;
185 }
186 }
187
188 public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
189 {
190 this.parallelExecutor = parallelExecutor;
191 this.logger = logger;
192 }
193
194 @PostInjection
195 public void start(RegistryShutdownHub hub)
196 {
197 hub.addRegistryShutdownListener(new Runnable()
198 {
199 public void run()
200 {
201 registryDidShutdown();
202 }
203 });
204
205 thread.start();
206 }
207
208
209 synchronized void removeJob(Job job)
210 {
211 if (logger.isDebugEnabled())
212 {
213 logger.debug("Removing " + job);
214 }
215
216 jobs.remove(job);
217 }
218
219
220 public synchronized PeriodicJob addJob(Schedule schedule, String name, Runnable job)
221 {
222 assert schedule != null;
223 assert name != null;
224 assert job != null;
225
226 Job periodicJob = new Job(schedule, name, job);
227
228 jobs.add(periodicJob);
229
230 if (logger.isDebugEnabled())
231 {
232 logger.debug("Added " + periodicJob);
233 }
234
235 // Wake the thread so that it can start the job, if necessary.
236
237 // Technically, this is only necessary if the new job is scheduled earlier
238 // than any job currently in the list of jobs, but this naive implementation
239 // is simpler.
240 thread.interrupt();
241
242 return periodicJob;
243 }
244
245 public void run()
246 {
247 while (!isShutdown())
248 {
249 long nextExecution = executeCurrentBatch();
250
251 try
252 {
253 long delay = nextExecution - System.currentTimeMillis();
254
255 if (logger.isTraceEnabled())
256 {
257 logger.trace(String.format("Sleeping for %,d ms", delay));
258 }
259
260 if (delay > 0)
261 {
262 Thread.sleep(delay);
263 }
264 } catch (InterruptedException
265 ex)
266 {
267 // Ignored; the thread is interrupted() to shut it down,
268 // or to have it execute a new batch.
269
270 logger.trace("Interrupted");
271 }
272 }
273 }
274
275 private synchronized boolean isShutdown()
276 {
277 return shutdown;
278 }
279
280 private synchronized void registryDidShutdown()
281 {
282 shutdown = true;
283
284 thread.interrupt();
285 }
286
287 /**
288 * Finds jobs and executes jobs that are ready to be executed.
289 *
290 * @return the next execution time (from the non-executing job that is scheduled earliest for execution).
291 */
292 private synchronized long executeCurrentBatch()
293 {
294 long now = System.currentTimeMillis();
295 long nextExecution = now + FIVE_MINUTES;
296
297 for (Job job : jobs)
298 {
299 if (job.isExecuting())
300 {
301 continue;
302 }
303
304 long jobNextExecution = job.getNextExecution();
305
306 if (jobNextExecution <= now)
307 {
308 job.start();
309 } else
310 {
311 nextExecution = Math.min(nextExecution, jobNextExecution);
312 }
313 }
314
315 return nextExecution;
316 }
317
318
319 }