一、前言
关于jetspeed-components源码包(2.1.4)组件定义调度任务接口org.apache.jetspeed.scheduler.Scheduler、调度任务抽象实现类org.apache.jetspeed.scheduler.AbstractScheduler及org.apache.jetspeed.scheduler.MemoryBasedScheduler、调度任务抽象类用于根据不同场景差异化扩展org.apache.jetspeed.scheduler.ScheduledJob、实现整个任务调度的线程工作类org.apache.jetspeed.scheduler。WorkerThread,详情参见源码说明。
二、源码说明
1. 调度接口及实现类
package org.apache.jetspeed.scheduler;@b@@b@import java.util.List;@b@@b@public abstract interface Scheduler@b@{@b@ public static final String SERVICE_NAME = "scheduler";@b@@b@ public abstract JobEntry getJob(int paramInt)@b@ throws Exception;@b@@b@ public abstract void addJob(JobEntry paramJobEntry)@b@ throws Exception;@b@@b@ public abstract void updateJob(JobEntry paramJobEntry)@b@ throws Exception;@b@@b@ public abstract void removeJob(JobEntry paramJobEntry)@b@ throws Exception;@b@@b@ public abstract List listJobs();@b@}
package org.apache.jetspeed.scheduler;@b@@b@public abstract class ScheduledJob@b@{@b@ public abstract void run(JobEntry paramJobEntry)@b@ throws Exception;@b@@b@ public void execute(JobEntry job)@b@ throws Exception@b@ {@b@ run(job);@b@ }@b@}
package org.apache.jetspeed.scheduler;@b@@b@import java.util.List;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@@b@public abstract class AbstractScheduler@b@ implements Scheduler@b@{@b@ private static final Log log = LogFactory.getLog(MemoryBasedScheduler.class);@b@ protected JobQueue scheduleQueue = null;@b@ protected MainLoop mainLoop;@b@ protected Thread thread;@b@@b@ public AbstractScheduler()@b@ {@b@ this.mainLoop = null;@b@ this.thread = null;@b@ }@b@@b@ public void start()@b@ {@b@ }@b@@b@ public void stop()@b@ {@b@ if (getThread() != null)@b@ {@b@ getThread().interrupt();@b@ }@b@ }@b@@b@ public abstract JobEntry getJob(int paramInt)@b@ throws Exception;@b@@b@ public abstract void addJob(JobEntry paramJobEntry)@b@ throws Exception;@b@@b@ public abstract void removeJob(JobEntry paramJobEntry)@b@ throws Exception;@b@@b@ public abstract void updateJob(JobEntry paramJobEntry)@b@ throws Exception;@b@@b@ public List listJobs()@b@ {@b@ return this.scheduleQueue.list();@b@ }@b@@b@ public synchronized Thread getThread()@b@ {@b@ return this.thread;@b@ }@b@@b@ private synchronized void clearThread()@b@ {@b@ this.thread = null;@b@ }@b@@b@ public synchronized void restart()@b@ {@b@ if (this.thread == null)@b@ {@b@ this.thread = new Thread(this.mainLoop, "scheduler");@b@@b@ this.thread.setDaemon(true);@b@ this.thread.start();@b@ }@b@ else@b@ {@b@ super.notify();@b@ }@b@ }@b@@b@ private synchronized JobEntry nextJob()@b@ throws Exception@b@ {@b@ try@b@ {@b@ while (!(Thread.interrupted()))@b@ {@b@ JobEntry je = this.scheduleQueue.getNext();@b@@b@ if (je == null)@b@ {@b@ super.wait();@b@ }@b@ else@b@ {@b@ long now = System.currentTimeMillis();@b@ long when = je.getNextRuntime();@b@@b@ if (when > now)@b@ {@b@ super.wait(when - now);@b@ }@b@ else@b@ {@b@ this.scheduleQueue.updateQueue(je);@b@@b@ return je;@b@ }@b@ }@b@ }@b@@b@ }@b@ catch (InterruptedException ex)@b@ {@b@ }@b@@b@ return null;@b@ }@b@@b@ static JobEntry access$000(AbstractScheduler x0)@b@ throws Exception@b@ {@b@ return x0.nextJob(); } @b@ static Log access$100() { return log; } @b@ static void access$200(AbstractScheduler x0) { x0.clearThread();@b@ }@b@@b@ protected class MainLoop@b@ implements Runnable@b@ {@b@ private final AbstractScheduler this$0;@b@@b@ public void run()@b@ {@b@ JobEntry je;@b@ try@b@ {@b@ while (true)@b@ {@b@ je = AbstractScheduler.access$000(this.this$0);@b@ if (je == null)@b@ break;@b@@b@ Runnable wt = new WorkerThread(je);@b@ Thread helper = new Thread(wt);@b@ helper.start();@b@ }@b@@b@ }@b@ catch (Exception e)@b@ {@b@ AbstractScheduler.access$100().error("Error running a Scheduled Job: " + e);@b@ }@b@ finally@b@ {@b@ AbstractScheduler.access$200(this.this$0);@b@ }@b@ }@b@ }@b@}
package org.apache.jetspeed.scheduler;@b@@b@import java.util.ArrayList;@b@import java.util.List;@b@import org.apache.commons.configuration.Configuration;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@@b@public class MemoryBasedScheduler extends AbstractScheduler@b@ implements Scheduler@b@{@b@ private static final Log log = LogFactory.getLog(MemoryBasedScheduler.class);@b@ private Configuration config;@b@@b@ public MemoryBasedScheduler(Configuration config)@b@ throws Exception@b@ {@b@ this.config = config;@b@ }@b@@b@ private Configuration getConfiguration()@b@ {@b@ return this.config;@b@ }@b@@b@ public void start()@b@ {@b@ try@b@ {@b@ int i;@b@ super.start();@b@ this.scheduleQueue = new JobQueue();@b@ this.mainLoop = new AbstractScheduler.MainLoop(this);@b@@b@ List jobProps = getConfiguration().getList("jobs");@b@ List jobs = new ArrayList();@b@@b@ if (!(jobProps.isEmpty()))@b@ {@b@ for (i = 0; i < jobProps.size(); ++i)@b@ {@b@ String jobName = (String)jobProps.get(i);@b@ String jobPrefix = "job." + jobName;@b@@b@ if (getConfiguration().getString(jobPrefix + ".ID", null) == null)@b@ {@b@ throw new Exception("There is an error in the properties file. \n" + jobPrefix + ".ID is not found.\n");@b@ }@b@@b@ int sec = getConfiguration().getInt(jobPrefix + ".SECOND", -1);@b@ int min = getConfiguration().getInt(jobPrefix + ".MINUTE", -1);@b@ int hr = getConfiguration().getInt(jobPrefix + ".HOUR", -1);@b@ int wkday = getConfiguration().getInt(jobPrefix + ".WEEKDAY", -1);@b@ int dayOfMonth = getConfiguration().getInt(jobPrefix + ".DAY_OF_MONTH", -1);@b@@b@ JobEntry je = new JobEntry(sec, min, hr, wkday, dayOfMonth, jobName);@b@@b@ jobs.add(je);@b@ }@b@@b@ }@b@@b@ if ((jobs != null) && (jobs.size() > 0))@b@ {@b@ this.scheduleQueue.batchLoad(jobs);@b@ restart();@b@ }@b@@b@ }@b@ catch (Exception e)@b@ {@b@ log.error("Cannot initialize SchedulerService!: ", e);@b@ }@b@ }@b@@b@ public void stop()@b@ {@b@ super.stop();@b@ }@b@@b@ public JobEntry getJob(int oid)@b@ throws Exception@b@ {@b@ JobEntry je = new JobEntry(-1, -1, -1, -1, -1, null);@b@@b@ return this.scheduleQueue.getJob(je);@b@ }@b@@b@ public void addJob(JobEntry je)@b@ throws Exception@b@ {@b@ this.scheduleQueue.add(je);@b@ restart();@b@ }@b@@b@ public void removeJob(JobEntry je)@b@ throws Exception@b@ {@b@ this.scheduleQueue.remove(je);@b@ restart();@b@ }@b@@b@ public void updateJob(JobEntry je)@b@ throws Exception@b@ {@b@ try@b@ {@b@ je.calcRunTime();@b@ }@b@ catch (Exception e)@b@ {@b@ log.error("Problem updating Scheduled Job: " + e);@b@ }@b@@b@ this.scheduleQueue.modify(je);@b@ restart();@b@ }@b@}
2.任务定义实现类
package org.apache.jetspeed.scheduler;@b@@b@public abstract class BaseJobEntry@b@{@b@ protected int jobId;@b@ protected int jobSecond;@b@ protected int jobMinute;@b@ protected int jobHour;@b@ protected int weekDay;@b@ protected int dayOfMonth;@b@ protected String task;@b@ protected String email;@b@@b@ public BaseJobEntry()@b@ {@b@ this.jobId = 0;@b@ this.jobSecond = -1;@b@ this.jobMinute = -1;@b@ this.jobHour = -1;@b@ this.weekDay = -1;@b@ this.dayOfMonth = -1;@b@ }@b@@b@ public int getJobId()@b@ {@b@ return this.jobId;@b@ }@b@@b@ public void setJobId(int v)@b@ {@b@ this.jobId = v;@b@ }@b@@b@ public int getSecond()@b@ {@b@ return this.jobSecond;@b@ }@b@@b@ public void setSecond(int v)@b@ {@b@ this.jobSecond = v;@b@ }@b@@b@ public int getMinute()@b@ {@b@ return this.jobMinute;@b@ }@b@@b@ public void setMinute(int v)@b@ {@b@ this.jobMinute = v;@b@ }@b@@b@ public int getHour()@b@ {@b@ return this.jobHour;@b@ }@b@@b@ public void setHour(int v)@b@ {@b@ this.jobHour = v;@b@ }@b@@b@ public int getWeekDay()@b@ {@b@ return this.weekDay;@b@ }@b@@b@ public void setWeekDay(int v)@b@ {@b@ this.weekDay = v;@b@ }@b@@b@ public int getDayOfMonth()@b@ {@b@ return this.dayOfMonth;@b@ }@b@@b@ public void setDayOfMonth(int v)@b@ {@b@ this.dayOfMonth = v;@b@ }@b@@b@ public String getTask()@b@ {@b@ return this.task;@b@ }@b@@b@ public void setTask(String v)@b@ {@b@ this.task = v;@b@ }@b@@b@ public String getEmail()@b@ {@b@ return this.email;@b@ }@b@@b@ public void setEmail(String v)@b@ {@b@ this.email = v;@b@ }@b@}
package org.apache.jetspeed.scheduler;@b@@b@import java.util.Calendar;@b@import java.util.Date;@b@@b@public class JobEntry extends BaseJobEntry@b@ implements Comparable@b@{@b@ private boolean jobIsActive = false;@b@ private long runtime = 0L;@b@ private static final int SECOND = 0;@b@ private static final int MINUTE = 1;@b@ private static final int WEEK_DAY = 2;@b@ private static final int DAY_OF_MONTH = 3;@b@ private static final int DAILY = 4;@b@@b@ public JobEntry()@b@ {@b@ }@b@@b@ public JobEntry(int sec, int min, int hour, int wd, int day_mo, String task)@b@ throws Exception@b@ {@b@ if ((task == null) || (task.length() == 0))@b@ {@b@ throw new Exception("Error in JobEntry. Bad Job parameter. Task not set.");@b@ }@b@@b@ setSecond(sec);@b@ setMinute(min);@b@ setHour(hour);@b@ setWeekDay(wd);@b@ setDayOfMonth(day_mo);@b@ setTask(task);@b@@b@ calcRunTime();@b@ }@b@@b@ public int compareTo(Object je)@b@ {@b@ int result = -1;@b@ if (je instanceof JobEntry)@b@ {@b@ if (this.jobId == ((JobEntry)je).getJobId())@b@ {@b@ return 0;@b@ }@b@@b@ if (this.jobId > ((JobEntry)je).getJobId())@b@ {@b@ return 1;@b@ }@b@@b@ }@b@@b@ return result;@b@ }@b@@b@ public void setActive(boolean isActive)@b@ {@b@ this.jobIsActive = isActive;@b@ }@b@@b@ public boolean isActive()@b@ {@b@ return this.jobIsActive;@b@ }@b@@b@ public long getNextRuntime()@b@ {@b@ return this.runtime;@b@ }@b@@b@ public String getNextRunAsString()@b@ {@b@ return new Date(this.runtime).toString();@b@ }@b@@b@ public void calcRunTime()@b@ throws Exception@b@ {@b@ Calendar schedrun = Calendar.getInstance();@b@ Calendar now = Calendar.getInstance();@b@@b@ switch (evaluateJobType())@b@ {@b@ case 0:@b@ schedrun.add(13, getSecond());@b@ this.runtime = schedrun.getTime().getTime();@b@ break;@b@ case 1:@b@ schedrun.add(13, getSecond());@b@ schedrun.add(12, getMinute());@b@ this.runtime = schedrun.getTime().getTime();@b@ break;@b@ case 2:@b@ schedrun.set(13, getSecond());@b@ schedrun.set(12, getMinute());@b@ schedrun.set(11, getHour());@b@ schedrun.set(7, getWeekDay());@b@@b@ if (now.before(schedrun))@b@ {@b@ this.runtime = schedrun.getTime().getTime(); return;@b@ }@b@@b@ schedrun.add(7, 7);@b@ this.runtime = schedrun.getTime().getTime();@b@@b@ break;@b@ case 3:@b@ schedrun.set(13, getSecond());@b@ schedrun.set(12, getMinute());@b@ schedrun.set(11, getHour());@b@ schedrun.set(5, getDayOfMonth());@b@@b@ if (now.before(schedrun))@b@ {@b@ this.runtime = schedrun.getTime().getTime(); return;@b@ }@b@@b@ schedrun.add(2, 1);@b@ this.runtime = schedrun.getTime().getTime();@b@@b@ break;@b@ case 4:@b@ schedrun.set(13, getSecond());@b@ schedrun.set(12, getMinute());@b@ schedrun.set(11, getHour());@b@@b@ if (now.before(schedrun))@b@ {@b@ this.runtime = schedrun.getTime().getTime(); return;@b@ }@b@@b@ schedrun.add(11, 24);@b@ this.runtime = schedrun.getTime().getTime();@b@ }@b@ }@b@@b@ private int evaluateJobType()@b@ throws Exception@b@ {@b@ if (getDayOfMonth() < 0)@b@ {@b@ if (getWeekDay() < 0)@b@ {@b@ if (getHour() < 0)@b@ {@b@ if (getMinute() < 0)@b@ {@b@ if (getSecond() < 0)@b@ throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@ return 0;@b@ }@b@@b@ if ((getMinute() < 0) || (getSecond() < 0))@b@ throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@ return 1;@b@ }@b@@b@ if ((getMinute() < 0) || (getHour() < 0) || (getSecond() < 0))@b@ throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@ return 4;@b@ }@b@@b@ if ((getMinute() < 0) || (getHour() < 0) || (getSecond() < 0))@b@ throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@ return 2;@b@ }@b@@b@ if ((getMinute() < 0) || (getHour() < 0))@b@ throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@ return 3;@b@ }@b@}
package org.apache.jetspeed.scheduler;@b@@b@import java.util.Collections;@b@import java.util.Comparator;@b@import java.util.List;@b@import java.util.Vector;@b@@b@public class JobQueue@b@{@b@ private Vector queue = null;@b@@b@ public JobQueue()@b@ throws Exception@b@ {@b@ this.queue = new Vector(10);@b@ }@b@@b@ public JobEntry getNext()@b@ {@b@ if (this.queue.size() > 0)@b@ {@b@ return ((JobEntry)this.queue.elementAt(0));@b@ }@b@@b@ return null;@b@ }@b@@b@ public JobEntry getJob(JobEntry je)@b@ {@b@ int index = -1;@b@@b@ if (je != null)@b@ {@b@ index = this.queue.indexOf(je);@b@ }@b@@b@ if (index < 0)@b@ {@b@ return null;@b@ }@b@@b@ return ((JobEntry)this.queue.elementAt(index));@b@ }@b@@b@ public Vector list()@b@ {@b@ if ((this.queue != null) && (this.queue.size() > 0))@b@ {@b@ return ((Vector)this.queue.clone());@b@ }@b@@b@ return null;@b@ }@b@@b@ public synchronized void add(JobEntry je)@b@ {@b@ this.queue.addElement(je);@b@ sortQueue();@b@ }@b@@b@ public synchronized void batchLoad(List jobEntries)@b@ {@b@ if (jobEntries != null)@b@ {@b@ this.queue.addAll(jobEntries);@b@ sortQueue();@b@ }@b@ }@b@@b@ public synchronized void remove(JobEntry je)@b@ {@b@ this.queue.removeElement(je);@b@ sortQueue();@b@ }@b@@b@ public synchronized void modify(JobEntry je)@b@ {@b@ sortQueue();@b@ }@b@@b@ public synchronized void updateQueue(JobEntry je)@b@ throws Exception@b@ {@b@ je.calcRunTime();@b@ sortQueue();@b@ }@b@@b@ private void sortQueue()@b@ {@b@ Comparator aComparator = new Comparator(this) {@b@ private final JobQueue this$0;@b@@b@ public int compare(, Object o2) {@b@ Long time1 = new Long(((JobEntry)o1).getNextRuntime());@b@ Long time2 = new Long(((JobEntry)o2).getNextRuntime());@b@ return time1.compareTo(time2);@b@ }@b@@b@ };@b@ Collections.sort(this.queue, aComparator);@b@ }@b@}
package org.apache.jetspeed.scheduler;@b@@b@public abstract class ScheduledJob@b@{@b@ public abstract void run(JobEntry paramJobEntry)@b@ throws Exception;@b@@b@ public void execute(JobEntry job)@b@ throws Exception@b@ {@b@ run(job);@b@ }@b@}
3. 调度任务线程引擎类
package org.apache.jetspeed.scheduler;@b@@b@public class WorkerThread@b@ implements Runnable@b@{@b@ private JobEntry je = null;@b@@b@ public WorkerThread(JobEntry je)@b@ {@b@ this.je = je;@b@ }@b@@b@ public void run()@b@ {@b@ if ((this.je == null) || (this.je.isActive()))@b@ {@b@ return;@b@ }@b@@b@ try@b@ {@b@ if (!(this.je.isActive()))@b@ {@b@ this.je.setActive(true);@b@ logStateChange("started");@b@@b@ String className = this.je.getTask();@b@@b@ ScheduledJob sc = (ScheduledJob)Class.forName(className).newInstance();@b@ sc.execute(this.je);@b@ }@b@@b@ }@b@ catch (Exception e)@b@ {@b@ }@b@ finally@b@ {@b@ if (this.je.isActive())@b@ {@b@ this.je.setActive(false);@b@ logStateChange("completed");@b@ }@b@ }@b@ }@b@@b@ private final void logStateChange(String state)@b@ {@b@ }@b@}