001// Licensed under the Apache License, Version 2.0 (the "License");
002// you may not use this file except in compliance with the License.
003// You may obtain a copy of the License at
004//
005// http://www.apache.org/licenses/LICENSE-2.0
006//
007// Unless required by applicable law or agreed to in writing, software
008// distributed under the License is distributed on an "AS IS" BASIS,
009// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
010// See the License for the specific language governing permissions and
011// limitations under the License.
012
013package org.apache.tapestry5.ioc.internal.services.cron;
014
015import org.apache.tapestry5.commons.util.CollectionFactory;
016import org.apache.tapestry5.ioc.Invokable;
017import org.apache.tapestry5.ioc.annotations.PostInjection;
018import org.apache.tapestry5.ioc.services.ParallelExecutor;
019import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
020import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
021import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
022import org.apache.tapestry5.ioc.services.cron.Schedule;
023import org.slf4j.Logger;
024
025import java.util.HashSet;
026import java.util.List;
027import java.util.Set;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.concurrent.locks.Lock;
031import java.util.concurrent.locks.ReentrantLock;
032
033public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
034{
035    private final ParallelExecutor parallelExecutor;
036
037    private final Logger logger;
038
039    // Synchronized by jobLock
040    private final List<Job> jobs = CollectionFactory.newList();
041
042    private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
043
044    private transient boolean shutdown;
045
046    private static final long FIVE_MINUTES = 5 * 60 * 1000;
047
048    private final AtomicInteger jobIdAllocator = new AtomicInteger();
049    
050    private final AtomicBoolean started = new AtomicBoolean();
051
052    private final Lock jobLock = new ReentrantLock();
053
054    private class Job implements PeriodicJob, Invokable<Void>
055    {
056        final int jobId = jobIdAllocator.incrementAndGet();
057
058        private final Schedule schedule;
059
060        private final String name;
061
062        private final Runnable runnableJob;
063
064        private boolean executing, canceled;
065
066        private long nextExecution;
067
068        public Job(Schedule schedule, String name, Runnable runnableJob)
069        {
070            this.schedule = schedule;
071            this.name = name;
072            this.runnableJob = runnableJob;
073
074            nextExecution = schedule.firstExecution();
075        }
076
077        @Override
078        public String getName()
079        {
080            return name;
081        }
082
083        public long getNextExecution()
084        {
085            try
086            {
087                jobLock.lock();
088                return nextExecution;
089            } finally
090            {
091                jobLock.unlock();
092            }
093        }
094
095
096        @Override
097        public boolean isExecuting()
098        {
099            try
100            {
101                jobLock.lock();
102                return executing;
103            } finally
104            {
105                jobLock.unlock();
106            }
107        }
108
109        @Override
110        public boolean isCanceled()
111        {
112            try
113            {
114                jobLock.lock();
115                return canceled;
116            } finally
117            {
118                jobLock.unlock();
119            }
120        }
121
122        @Override
123        public void cancel()
124        {
125            try
126            {
127                jobLock.lock();
128
129                canceled = true;
130
131                if (!executing)
132                {
133                    removeJob(this);
134                }
135
136                // Otherwise, it will be caught when the job finishes execution.
137            } finally
138            {
139                jobLock.unlock();
140            }
141        }
142
143        @Override
144        public String toString()
145        {
146            StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
147
148            builder.append(", (").append(name).append(')');
149
150            if (executing)
151            {
152                builder.append(", executing");
153            }
154
155            if (canceled)
156            {
157                builder.append(", canceled");
158            } else
159            {
160                builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
161            }
162
163            return builder.append(']').toString();
164        }
165
166        /**
167         * Starts execution of the job; this sets the executing flag, calculates the next execution time,
168         * and uses the ParallelExecutor to run the job.
169         */
170        void start()
171        {
172            try
173            {
174                jobLock.lock();
175                executing = true;
176
177                // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
178                // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
179                // overlapping executions of the Job on a more rigid schedule.  Use Quartz.
180
181                nextExecution = schedule.nextExecution(nextExecution);
182
183                parallelExecutor.invoke(this);
184            } finally
185            {
186                jobLock.unlock();
187            }
188
189            if (logger.isTraceEnabled())
190            {
191                logger.trace(this + " sent for execution");
192            }
193        }
194
195        void cleanupAfterExecution()
196        {
197            try
198            {
199                if (logger.isTraceEnabled())
200                {
201                    logger.trace(this + " execution complete");
202                }
203
204                executing = false;
205
206                if (canceled)
207                {
208                    removeJob(this);
209                } else
210                {
211                    // Again, naive but necessary.
212                    thread.interrupt();
213                }
214            } finally
215            {
216                jobLock.unlock();
217            }
218        }
219
220        @Override
221        public Void invoke()
222        {
223            logger.debug("Executing job #{} ({})", jobId, name);
224
225            try
226            {
227                runnableJob.run();
228            } finally
229            {
230                cleanupAfterExecution();
231            }
232
233            return null;
234        }
235
236    }
237
238    public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
239    {
240        this.parallelExecutor = parallelExecutor;
241        this.logger = logger;
242    }
243
244    @PostInjection
245    public void start(RegistryShutdownHub hub)
246    {
247        hub.addRegistryShutdownListener(new Runnable()
248        {
249            @Override
250            public void run()
251            {
252                registryDidShutdown();
253            }
254        });
255
256    }
257
258    public void init()
259    {
260        if (!started.get())
261        {
262            started.set(true);
263            thread.start();
264        }
265    }
266
267    void removeJob(Job job)
268    {
269        if (logger.isDebugEnabled())
270        {
271            logger.debug("Removing " + job);
272        }
273
274        try
275        {
276            jobLock.lock();
277            jobs.remove(job);
278        } finally
279        {
280            jobLock.unlock();
281        }
282    }
283
284
285    @Override
286    public PeriodicJob addJob(Schedule schedule, String name, Runnable job)
287    {
288        assert schedule != null;
289        assert name != null;
290        assert job != null;
291
292        Job periodicJob = new Job(schedule, name, job);
293
294        try
295        {
296            jobLock.lock();
297
298            jobs.add(periodicJob);
299        } finally
300        {
301            jobLock.unlock();
302        }
303
304        if (logger.isDebugEnabled())
305        {
306            logger.debug("Added " + periodicJob);
307        }
308
309        // Wake the thread so that it can start the job, if necessary.
310
311        // Technically, this is only necessary if the new job is scheduled earlier
312        // than any job currently in the list of jobs, but this naive implementation
313        // is simpler.
314        thread.interrupt();
315
316        return periodicJob;
317    }
318
319    @Override
320    public void run()
321    {
322        while (!shutdown)
323        {
324            long nextExecution = executeCurrentBatch();
325
326            try
327            {
328                long delay = nextExecution - System.currentTimeMillis();
329
330                if (logger.isTraceEnabled())
331                {
332                    logger.trace(String.format("Sleeping for %,d ms", delay));
333                }
334
335                if (delay > 0)
336                {
337                    Thread.sleep(delay);
338                }
339            } catch (InterruptedException
340                    ex)
341            {
342                // Ignored; the thread is interrupted() to shut it down,
343                // or to have it execute a new batch.
344
345                logger.trace("Interrupted");
346            }
347        }
348    }
349
350    private void registryDidShutdown()
351    {
352        shutdown = true;
353
354        thread.interrupt();
355    }
356
357    /**
358     * Finds jobs and executes jobs that are ready to be executed.
359     *
360     * @return the next execution time (from the non-executing job that is scheduled earliest for execution).
361     */
362    private long executeCurrentBatch()
363    {
364        long now = System.currentTimeMillis();
365        long nextExecution = now + FIVE_MINUTES;
366
367        try
368        {
369            jobLock.lock();
370            // TAP5-2455
371            Set<Job> jobsToCancel = null;
372
373            for (Job job : jobs)
374            {
375                if (job.isExecuting())
376                {
377                    continue;
378                }
379
380                long jobNextExecution = job.getNextExecution();
381
382                if (jobNextExecution <= 0)
383                {
384                    if (jobsToCancel == null)
385                    {
386                        jobsToCancel = new HashSet<PeriodicExecutorImpl.Job>();
387                    }
388                    jobsToCancel.add(job);
389                } else if (jobNextExecution <= now)
390                {
391                    job.start();
392                } else
393                {
394                    nextExecution = Math.min(nextExecution, jobNextExecution);
395                }
396            }
397            if (jobsToCancel != null)
398            {
399                for (Job job : jobsToCancel)
400                {
401                    job.cancel();
402                }
403            }
404        } finally
405        {
406            jobLock.unlock();
407        }
408
409        return nextExecution;
410    }
411
412
413}