001    // Copyright 2011 The Apache Software Foundation
002    //
003    // Licensed under the Apache License, Version 2.0 (the "License");
004    // you may not use this file except in compliance with the License.
005    // You may obtain a copy of the License at
006    //
007    // http://www.apache.org/licenses/LICENSE-2.0
008    //
009    // Unless required by applicable law or agreed to in writing, software
010    // distributed under the License is distributed on an "AS IS" BASIS,
011    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
012    // See the License for the specific language governing permissions and
013    // limitations under the License.
014    
015    package org.apache.tapestry5.ioc.internal.services.cron;
016    
017    import org.apache.tapestry5.ioc.Invokable;
018    import org.apache.tapestry5.ioc.annotations.PostInjection;
019    import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
020    import org.apache.tapestry5.ioc.services.ParallelExecutor;
021    import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
022    import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
023    import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
024    import org.apache.tapestry5.ioc.services.cron.Schedule;
025    import org.slf4j.Logger;
026    
027    import java.util.List;
028    import java.util.concurrent.atomic.AtomicInteger;
029    
030    public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
031    {
032        private final ParallelExecutor parallelExecutor;
033    
034        private final Logger logger;
035    
036        // Synchronized by this
037        private final List<Job> jobs = CollectionFactory.newList();
038    
039        private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
040    
041        // Synchronized by this. Set when the registry is shutdown.
042        private boolean shutdown;
043    
044        private static final long FIVE_MINUTES = 5 * 60 * 1000;
045    
046        private final AtomicInteger jobIdAllocator = new AtomicInteger();
047    
048        private class Job implements PeriodicJob, Invokable<Void>
049        {
050            final int jobId = jobIdAllocator.incrementAndGet();
051    
052            private final Schedule schedule;
053    
054            private final String name;
055    
056            private final Runnable runnableJob;
057    
058            private boolean executing, canceled;
059    
060            private long nextExecution;
061    
062            public Job(Schedule schedule, String name, Runnable runnableJob)
063            {
064                this.schedule = schedule;
065                this.name = name;
066                this.runnableJob = runnableJob;
067    
068                nextExecution = schedule.firstExecution();
069            }
070    
071            public String getName()
072            {
073                return name;
074            }
075    
076            public synchronized long getNextExecution()
077            {
078                return nextExecution;
079            }
080    
081    
082            public synchronized boolean isExecuting()
083            {
084                return executing;
085            }
086    
087            public synchronized boolean isCanceled()
088            {
089                return canceled;
090            }
091    
092            public synchronized void cancel()
093            {
094                canceled = true;
095    
096                if (!executing)
097                {
098                    removeJob(this);
099                }
100    
101                // Otherwise, it will be caught when the job finishes execution.
102            }
103    
104            @Override
105            public synchronized String toString()
106            {
107                StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
108    
109    
110                builder.append(", (").append(name).append(")");
111    
112                if (executing)
113                {
114                    builder.append(", executing");
115                }
116    
117                if (canceled)
118                {
119                    builder.append(", canceled");
120                } else
121                {
122                    builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
123                }
124    
125                return builder.append("]").toString();
126            }
127    
128            /**
129             * Starts execution of the job; this sets the executing flag, calculates the next execution time,
130             * and uses the ParallelExecutor to run the job.
131             */
132            synchronized void start()
133            {
134                executing = true;
135    
136                // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
137                // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
138                // overlapping executions of the Job on a more rigid schedule.  Use Quartz.
139    
140                nextExecution = schedule.nextExecution(nextExecution);
141    
142                parallelExecutor.invoke(this);
143    
144                if (logger.isTraceEnabled())
145                {
146                    logger.trace(this + " sent for execution");
147                }
148            }
149    
150            synchronized void cleanupAfterExecution()
151            {
152                if (logger.isTraceEnabled())
153                {
154                    logger.trace(this + " execution complete");
155                }
156    
157                executing = false;
158    
159                if (canceled)
160                {
161                    removeJob(this);
162                } else
163                {
164                    // Again, naive but necessary.
165                    thread.interrupt();
166                }
167            }
168    
169            public Void invoke()
170            {
171                if (logger.isDebugEnabled())
172                {
173                    logger.debug(String.format("Executing job #%d (%s)", jobId, name));
174                }
175    
176                try
177                {
178                    runnableJob.run();
179                } finally
180                {
181                    cleanupAfterExecution();
182                }
183    
184                return null;
185            }
186        }
187    
188        public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
189        {
190            this.parallelExecutor = parallelExecutor;
191            this.logger = logger;
192        }
193    
194        @PostInjection
195        public void start(RegistryShutdownHub hub)
196        {
197            hub.addRegistryShutdownListener(new Runnable()
198            {
199                public void run()
200                {
201                    registryDidShutdown();
202                }
203            });
204    
205            thread.start();
206        }
207    
208    
209        synchronized void removeJob(Job job)
210        {
211            if (logger.isDebugEnabled())
212            {
213                logger.debug("Removing " + job);
214            }
215    
216            jobs.remove(job);
217        }
218    
219    
220        public synchronized PeriodicJob addJob(Schedule schedule, String name, Runnable job)
221        {
222            assert schedule != null;
223            assert name != null;
224            assert job != null;
225    
226            Job periodicJob = new Job(schedule, name, job);
227    
228            jobs.add(periodicJob);
229    
230            if (logger.isDebugEnabled())
231            {
232                logger.debug("Added " + periodicJob);
233            }
234    
235            // Wake the thread so that it can start the job, if necessary.
236    
237            // Technically, this is only necessary if the new job is scheduled earlier
238            // than any job currently in the list of jobs, but this naive implementation
239            // is simpler.
240            thread.interrupt();
241    
242            return periodicJob;
243        }
244    
245        public void run()
246        {
247            while (!isShutdown())
248            {
249                long nextExecution = executeCurrentBatch();
250    
251                try
252                {
253                    long delay = nextExecution - System.currentTimeMillis();
254    
255                    if (logger.isTraceEnabled())
256                    {
257                        logger.trace(String.format("Sleeping for %,d ms", delay));
258                    }
259    
260                    if (delay > 0)
261                    {
262                        Thread.sleep(delay);
263                    }
264                } catch (InterruptedException
265                        ex)
266                {
267                    // Ignored; the thread is interrupted() to shut it down,
268                    // or to have it execute a new batch.
269    
270                    logger.trace("Interrupted");
271                }
272            }
273        }
274    
275        private synchronized boolean isShutdown()
276        {
277            return shutdown;
278        }
279    
280        private synchronized void registryDidShutdown()
281        {
282            shutdown = true;
283    
284            thread.interrupt();
285        }
286    
287        /**
288         * Finds jobs and executes jobs that are ready to be executed.
289         *
290         * @return the next execution time (from the non-executing job that is scheduled earliest for execution).
291         */
292        private synchronized long executeCurrentBatch()
293        {
294            long now = System.currentTimeMillis();
295            long nextExecution = now + FIVE_MINUTES;
296    
297            for (Job job : jobs)
298            {
299                if (job.isExecuting())
300                {
301                    continue;
302                }
303    
304                long jobNextExecution = job.getNextExecution();
305    
306                if (jobNextExecution <= now)
307                {
308                    job.start();
309                } else
310                {
311                    nextExecution = Math.min(nextExecution, jobNextExecution);
312                }
313            }
314    
315            return nextExecution;
316        }
317    
318    
319    }