001// Copyright 2011 The Apache Software Foundation
002//
003// Licensed under the Apache License, Version 2.0 (the "License");
004// you may not use this file except in compliance with the License.
005// You may obtain a copy of the License at
006//
007// http://www.apache.org/licenses/LICENSE-2.0
008//
009// Unless required by applicable law or agreed to in writing, software
010// distributed under the License is distributed on an "AS IS" BASIS,
011// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
012// See the License for the specific language governing permissions and
013// limitations under the License.
014
015package org.apache.tapestry5.ioc.internal.services.cron;
016
017import org.apache.tapestry5.ioc.Invokable;
018import org.apache.tapestry5.ioc.annotations.PostInjection;
019import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
020import org.apache.tapestry5.ioc.services.ParallelExecutor;
021import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
022import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
023import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
024import org.apache.tapestry5.ioc.services.cron.Schedule;
025import org.slf4j.Logger;
026
027import java.util.List;
028import java.util.concurrent.atomic.AtomicInteger;
029
030public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
031{
032    private final ParallelExecutor parallelExecutor;
033
034    private final Logger logger;
035
036    // Synchronized by this
037    private final List<Job> jobs = CollectionFactory.newList();
038
039    private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
040
041    // Synchronized by this. Set when the registry is shutdown.
042    private boolean shutdown;
043
044    private static final long FIVE_MINUTES = 5 * 60 * 1000;
045
046    private final AtomicInteger jobIdAllocator = new AtomicInteger();
047
048    private class Job implements PeriodicJob, Invokable<Void>
049    {
050        final int jobId = jobIdAllocator.incrementAndGet();
051
052        private final Schedule schedule;
053
054        private final String name;
055
056        private final Runnable runnableJob;
057
058        private boolean executing, canceled;
059
060        private long nextExecution;
061
062        public Job(Schedule schedule, String name, Runnable runnableJob)
063        {
064            this.schedule = schedule;
065            this.name = name;
066            this.runnableJob = runnableJob;
067
068            nextExecution = schedule.firstExecution();
069        }
070
071        @Override
072        public String getName()
073        {
074            return name;
075        }
076
077        public synchronized long getNextExecution()
078        {
079            return nextExecution;
080        }
081
082
083        @Override
084        public synchronized boolean isExecuting()
085        {
086            return executing;
087        }
088
089        @Override
090        public synchronized boolean isCanceled()
091        {
092            return canceled;
093        }
094
095        @Override
096        public synchronized void cancel()
097        {
098            canceled = true;
099
100            if (!executing)
101            {
102                removeJob(this);
103            }
104
105            // Otherwise, it will be caught when the job finishes execution.
106        }
107
108        @Override
109        public synchronized String toString()
110        {
111            StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
112
113
114            builder.append(", (").append(name).append(")");
115
116            if (executing)
117            {
118                builder.append(", executing");
119            }
120
121            if (canceled)
122            {
123                builder.append(", canceled");
124            } else
125            {
126                builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
127            }
128
129            return builder.append("]").toString();
130        }
131
132        /**
133         * Starts execution of the job; this sets the executing flag, calculates the next execution time,
134         * and uses the ParallelExecutor to run the job.
135         */
136        synchronized void start()
137        {
138            executing = true;
139
140            // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
141            // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
142            // overlapping executions of the Job on a more rigid schedule.  Use Quartz.
143
144            nextExecution = schedule.nextExecution(nextExecution);
145
146            parallelExecutor.invoke(this);
147
148            if (logger.isTraceEnabled())
149            {
150                logger.trace(this + " sent for execution");
151            }
152        }
153
154        synchronized void cleanupAfterExecution()
155        {
156            if (logger.isTraceEnabled())
157            {
158                logger.trace(this + " execution complete");
159            }
160
161            executing = false;
162
163            if (canceled)
164            {
165                removeJob(this);
166            } else
167            {
168                // Again, naive but necessary.
169                thread.interrupt();
170            }
171        }
172
173        @Override
174        public Void invoke()
175        {
176            if (logger.isDebugEnabled())
177            {
178                logger.debug(String.format("Executing job #%d (%s)", jobId, name));
179            }
180
181            try
182            {
183                runnableJob.run();
184            } finally
185            {
186                cleanupAfterExecution();
187            }
188
189            return null;
190        }
191    }
192
193    public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
194    {
195        this.parallelExecutor = parallelExecutor;
196        this.logger = logger;
197    }
198
199    @PostInjection
200    public void start(RegistryShutdownHub hub)
201    {
202        hub.addRegistryShutdownListener(new Runnable()
203        {
204            @Override
205            public void run()
206            {
207                registryDidShutdown();
208            }
209        });
210
211        thread.start();
212    }
213
214
215    synchronized void removeJob(Job job)
216    {
217        if (logger.isDebugEnabled())
218        {
219            logger.debug("Removing " + job);
220        }
221
222        jobs.remove(job);
223    }
224
225
226    @Override
227    public synchronized PeriodicJob addJob(Schedule schedule, String name, Runnable job)
228    {
229        assert schedule != null;
230        assert name != null;
231        assert job != null;
232
233        Job periodicJob = new Job(schedule, name, job);
234
235        jobs.add(periodicJob);
236
237        if (logger.isDebugEnabled())
238        {
239            logger.debug("Added " + periodicJob);
240        }
241
242        // Wake the thread so that it can start the job, if necessary.
243
244        // Technically, this is only necessary if the new job is scheduled earlier
245        // than any job currently in the list of jobs, but this naive implementation
246        // is simpler.
247        thread.interrupt();
248
249        return periodicJob;
250    }
251
252    @Override
253    public void run()
254    {
255        while (!isShutdown())
256        {
257            long nextExecution = executeCurrentBatch();
258
259            try
260            {
261                long delay = nextExecution - System.currentTimeMillis();
262
263                if (logger.isTraceEnabled())
264                {
265                    logger.trace(String.format("Sleeping for %,d ms", delay));
266                }
267
268                if (delay > 0)
269                {
270                    Thread.sleep(delay);
271                }
272            } catch (InterruptedException
273                    ex)
274            {
275                // Ignored; the thread is interrupted() to shut it down,
276                // or to have it execute a new batch.
277
278                logger.trace("Interrupted");
279            }
280        }
281    }
282
283    private synchronized boolean isShutdown()
284    {
285        return shutdown;
286    }
287
288    private synchronized void registryDidShutdown()
289    {
290        shutdown = true;
291
292        thread.interrupt();
293    }
294
295    /**
296     * Finds jobs and executes jobs that are ready to be executed.
297     *
298     * @return the next execution time (from the non-executing job that is scheduled earliest for execution).
299     */
300    private synchronized long executeCurrentBatch()
301    {
302        long now = System.currentTimeMillis();
303        long nextExecution = now + FIVE_MINUTES;
304
305        for (Job job : jobs)
306        {
307            if (job.isExecuting())
308            {
309                continue;
310            }
311
312            long jobNextExecution = job.getNextExecution();
313
314            if (jobNextExecution <= now)
315            {
316                job.start();
317            } else
318            {
319                nextExecution = Math.min(nextExecution, jobNextExecution);
320            }
321        }
322
323        return nextExecution;
324    }
325
326
327}