001// Licensed under the Apache License, Version 2.0 (the "License");
002// you may not use this file except in compliance with the License.
003// You may obtain a copy of the License at
004//
005// http://www.apache.org/licenses/LICENSE-2.0
006//
007// Unless required by applicable law or agreed to in writing, software
008// distributed under the License is distributed on an "AS IS" BASIS,
009// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
010// See the License for the specific language governing permissions and
011// limitations under the License.
012
013package org.apache.tapestry5.ioc.internal.services.cron;
014
015import org.apache.tapestry5.ioc.Invokable;
016import org.apache.tapestry5.ioc.annotations.PostInjection;
017import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
018import org.apache.tapestry5.ioc.services.ParallelExecutor;
019import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
020import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
021import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
022import org.apache.tapestry5.ioc.services.cron.Schedule;
023import org.slf4j.Logger;
024
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Set;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.concurrent.locks.Lock;
031import java.util.concurrent.locks.ReentrantLock;
032
033public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
034{
035    private final ParallelExecutor parallelExecutor;
036
037    private final Logger logger;
038
039    // Synchronized by jobLock
040    private final List<Job> jobs = CollectionFactory.newList();
041
042    private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
043
044    private transient boolean shutdown;
045
046    private static final long FIVE_MINUTES = 5 * 60 * 1000;
047
048    private final AtomicInteger jobIdAllocator = new AtomicInteger();
049
050    private final Lock jobLock = new ReentrantLock();
051
052    private class Job implements PeriodicJob, Invokable<Void>
053    {
054        final int jobId = jobIdAllocator.incrementAndGet();
055
056        private final Schedule schedule;
057
058        private final String name;
059
060        private final Runnable runnableJob;
061
062        private boolean executing, canceled;
063
064        private long nextExecution;
065
066        public Job(Schedule schedule, String name, Runnable runnableJob)
067        {
068            this.schedule = schedule;
069            this.name = name;
070            this.runnableJob = runnableJob;
071
072            nextExecution = schedule.firstExecution();
073        }
074
075        @Override
076        public String getName()
077        {
078            return name;
079        }
080
081        public long getNextExecution()
082        {
083            try
084            {
085                jobLock.lock();
086                return nextExecution;
087            } finally
088            {
089                jobLock.unlock();
090            }
091        }
092
093
094        @Override
095        public boolean isExecuting()
096        {
097            try
098            {
099                jobLock.lock();
100                return executing;
101            } finally
102            {
103                jobLock.unlock();
104            }
105        }
106
107        @Override
108        public boolean isCanceled()
109        {
110            try
111            {
112                jobLock.lock();
113                return canceled;
114            } finally
115            {
116                jobLock.unlock();
117            }
118        }
119
120        @Override
121        public void cancel()
122        {
123            try
124            {
125                jobLock.lock();
126
127                canceled = true;
128
129                if (!executing)
130                {
131                    removeJob(this);
132                }
133
134                // Otherwise, it will be caught when the job finishes execution.
135            } finally
136            {
137                jobLock.unlock();
138            }
139        }
140
141        @Override
142        public String toString()
143        {
144            StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
145
146            builder.append(", (").append(name).append(')');
147
148            if (executing)
149            {
150                builder.append(", executing");
151            }
152
153            if (canceled)
154            {
155                builder.append(", canceled");
156            } else
157            {
158                builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
159            }
160
161            return builder.append(']').toString();
162        }
163
164        /**
165         * Starts execution of the job; this sets the executing flag, calculates the next execution time,
166         * and uses the ParallelExecutor to run the job.
167         */
168        void start()
169        {
170            try
171            {
172                jobLock.lock();
173                executing = true;
174
175                // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
176                // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
177                // overlapping executions of the Job on a more rigid schedule.  Use Quartz.
178
179                nextExecution = schedule.nextExecution(nextExecution);
180
181                parallelExecutor.invoke(this);
182            } finally
183            {
184                jobLock.unlock();
185            }
186
187            if (logger.isTraceEnabled())
188            {
189                logger.trace(this + " sent for execution");
190            }
191        }
192
193        void cleanupAfterExecution()
194        {
195            try
196            {
197                if (logger.isTraceEnabled())
198                {
199                    logger.trace(this + " execution complete");
200                }
201
202                executing = false;
203
204                if (canceled)
205                {
206                    removeJob(this);
207                } else
208                {
209                    // Again, naive but necessary.
210                    thread.interrupt();
211                }
212            } finally
213            {
214                jobLock.unlock();
215            }
216        }
217
218        @Override
219        public Void invoke()
220        {
221            if (logger.isDebugEnabled())
222            {
223                logger.debug(String.format("Executing job #%d (%s)", jobId, name));
224            }
225
226            try
227            {
228                runnableJob.run();
229            } finally
230            {
231                cleanupAfterExecution();
232            }
233
234            return null;
235        }
236
237    }
238
239    public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
240    {
241        this.parallelExecutor = parallelExecutor;
242        this.logger = logger;
243    }
244
245    @PostInjection
246    public void start(RegistryShutdownHub hub)
247    {
248        hub.addRegistryShutdownListener(new Runnable()
249        {
250            @Override
251            public void run()
252            {
253                registryDidShutdown();
254            }
255        });
256
257        thread.start();
258    }
259
260
261    void removeJob(Job job)
262    {
263        if (logger.isDebugEnabled())
264        {
265            logger.debug("Removing " + job);
266        }
267
268        try
269        {
270            jobLock.lock();
271            jobs.remove(job);
272        } finally
273        {
274            jobLock.unlock();
275        }
276    }
277
278
279    @Override
280    public PeriodicJob addJob(Schedule schedule, String name, Runnable job)
281    {
282        assert schedule != null;
283        assert name != null;
284        assert job != null;
285
286        Job periodicJob = new Job(schedule, name, job);
287
288        try
289        {
290            jobLock.lock();
291
292            jobs.add(periodicJob);
293        } finally
294        {
295            jobLock.unlock();
296        }
297
298        if (logger.isDebugEnabled())
299        {
300            logger.debug("Added " + periodicJob);
301        }
302
303        // Wake the thread so that it can start the job, if necessary.
304
305        // Technically, this is only necessary if the new job is scheduled earlier
306        // than any job currently in the list of jobs, but this naive implementation
307        // is simpler.
308        thread.interrupt();
309
310        return periodicJob;
311    }
312
313    @Override
314    public void run()
315    {
316        while (!shutdown)
317        {
318            long nextExecution = executeCurrentBatch();
319
320            try
321            {
322                long delay = nextExecution - System.currentTimeMillis();
323
324                if (logger.isTraceEnabled())
325                {
326                    logger.trace(String.format("Sleeping for %,d ms", delay));
327                }
328
329                if (delay > 0)
330                {
331                    Thread.sleep(delay);
332                }
333            } catch (InterruptedException
334                    ex)
335            {
336                // Ignored; the thread is interrupted() to shut it down,
337                // or to have it execute a new batch.
338
339                logger.trace("Interrupted");
340            }
341        }
342    }
343
344    private void registryDidShutdown()
345    {
346        shutdown = true;
347
348        thread.interrupt();
349    }
350
351    /**
352     * Finds jobs and executes jobs that are ready to be executed.
353     *
354     * @return the next execution time (from the non-executing job that is scheduled earliest for execution).
355     */
356    private long executeCurrentBatch()
357    {
358        long now = System.currentTimeMillis();
359        long nextExecution = now + FIVE_MINUTES;
360
361        try
362        {
363            jobLock.lock();
364            // TAP5-2455
365            Set<Job> jobsToCancel = null;
366
367            for (Job job : jobs)
368            {
369                if (job.isExecuting())
370                {
371                    continue;
372                }
373
374                long jobNextExecution = job.getNextExecution();
375
376                if (jobNextExecution <= 0)
377                {
378                    if (jobsToCancel == null)
379                    {
380                        jobsToCancel = new HashSet<PeriodicExecutorImpl.Job>();
381                    }
382                    jobsToCancel.add(job);
383                } else if (jobNextExecution <= now)
384                {
385                    job.start();
386                } else
387                {
388                    nextExecution = Math.min(nextExecution, jobNextExecution);
389                }
390            }
391            if (jobsToCancel != null)
392            {
393                for (Job job : jobsToCancel)
394                {
395                    job.cancel();
396                }
397            }
398        } finally
399        {
400            jobLock.unlock();
401        }
402
403        return nextExecution;
404    }
405
406
407}