首页

基于jetspeed组件components源码包Scheduler调度任务简单设计实现源码示例说明

标签:jetspeed,components,Scheduler,调度任务,定时任务,apache     发布时间:2018-08-20   

一、前言

关于jetspeed-components源码包(2.1.4)组件定义调度任务接口org.apache.jetspeed.scheduler.Scheduler、调度任务抽象实现类org.apache.jetspeed.scheduler.AbstractScheduler及org.apache.jetspeed.scheduler.MemoryBasedScheduler、调度任务抽象类用于根据不同场景差异化扩展org.apache.jetspeed.scheduler.ScheduledJob、实现整个任务调度的线程工作类org.apache.jetspeed.scheduler。WorkerThread,详情参见源码说明。

二、源码说明

1. 调度接口及实现类

package org.apache.jetspeed.scheduler;@b@@b@import java.util.List;@b@@b@public abstract interface Scheduler@b@{@b@  public static final String SERVICE_NAME = "scheduler";@b@@b@  public abstract JobEntry getJob(int paramInt)@b@    throws Exception;@b@@b@  public abstract void addJob(JobEntry paramJobEntry)@b@    throws Exception;@b@@b@  public abstract void updateJob(JobEntry paramJobEntry)@b@    throws Exception;@b@@b@  public abstract void removeJob(JobEntry paramJobEntry)@b@    throws Exception;@b@@b@  public abstract List listJobs();@b@}
package org.apache.jetspeed.scheduler;@b@@b@public abstract class ScheduledJob@b@{@b@  public abstract void run(JobEntry paramJobEntry)@b@    throws Exception;@b@@b@  public void execute(JobEntry job)@b@    throws Exception@b@  {@b@    run(job);@b@  }@b@}
package org.apache.jetspeed.scheduler;@b@@b@import java.util.List;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@@b@public abstract class AbstractScheduler@b@  implements Scheduler@b@{@b@  private static final Log log = LogFactory.getLog(MemoryBasedScheduler.class);@b@  protected JobQueue scheduleQueue = null;@b@  protected MainLoop mainLoop;@b@  protected Thread thread;@b@@b@  public AbstractScheduler()@b@  {@b@    this.mainLoop = null;@b@    this.thread = null;@b@  }@b@@b@  public void start()@b@  {@b@  }@b@@b@  public void stop()@b@  {@b@    if (getThread() != null)@b@    {@b@      getThread().interrupt();@b@    }@b@  }@b@@b@  public abstract JobEntry getJob(int paramInt)@b@    throws Exception;@b@@b@  public abstract void addJob(JobEntry paramJobEntry)@b@    throws Exception;@b@@b@  public abstract void removeJob(JobEntry paramJobEntry)@b@    throws Exception;@b@@b@  public abstract void updateJob(JobEntry paramJobEntry)@b@    throws Exception;@b@@b@  public List listJobs()@b@  {@b@    return this.scheduleQueue.list();@b@  }@b@@b@  public synchronized Thread getThread()@b@  {@b@    return this.thread;@b@  }@b@@b@  private synchronized void clearThread()@b@  {@b@    this.thread = null;@b@  }@b@@b@  public synchronized void restart()@b@  {@b@    if (this.thread == null)@b@    {@b@      this.thread = new Thread(this.mainLoop, "scheduler");@b@@b@      this.thread.setDaemon(true);@b@      this.thread.start();@b@    }@b@    else@b@    {@b@      super.notify();@b@    }@b@  }@b@@b@  private synchronized JobEntry nextJob()@b@    throws Exception@b@  {@b@    try@b@    {@b@      while (!(Thread.interrupted()))@b@      {@b@        JobEntry je = this.scheduleQueue.getNext();@b@@b@        if (je == null)@b@        {@b@          super.wait();@b@        }@b@        else@b@        {@b@          long now = System.currentTimeMillis();@b@          long when = je.getNextRuntime();@b@@b@          if (when > now)@b@          {@b@            super.wait(when - now);@b@          }@b@          else@b@          {@b@            this.scheduleQueue.updateQueue(je);@b@@b@            return je;@b@          }@b@        }@b@      }@b@@b@    }@b@    catch (InterruptedException ex)@b@    {@b@    }@b@@b@    return null;@b@  }@b@@b@  static JobEntry access$000(AbstractScheduler x0)@b@    throws Exception@b@  {@b@    return x0.nextJob(); } @b@  static Log access$100() { return log; } @b@  static void access$200(AbstractScheduler x0) { x0.clearThread();@b@  }@b@@b@  protected class MainLoop@b@    implements Runnable@b@  {@b@    private final AbstractScheduler this$0;@b@@b@    public void run()@b@    {@b@      JobEntry je;@b@      try@b@      {@b@        while (true)@b@        {@b@          je = AbstractScheduler.access$000(this.this$0);@b@          if (je == null)@b@            break;@b@@b@          Runnable wt = new WorkerThread(je);@b@          Thread helper = new Thread(wt);@b@          helper.start();@b@        }@b@@b@      }@b@      catch (Exception e)@b@      {@b@        AbstractScheduler.access$100().error("Error running a Scheduled Job: " + e);@b@      }@b@      finally@b@      {@b@        AbstractScheduler.access$200(this.this$0);@b@      }@b@    }@b@  }@b@}
package org.apache.jetspeed.scheduler;@b@@b@import java.util.ArrayList;@b@import java.util.List;@b@import org.apache.commons.configuration.Configuration;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@@b@public class MemoryBasedScheduler extends AbstractScheduler@b@  implements Scheduler@b@{@b@  private static final Log log = LogFactory.getLog(MemoryBasedScheduler.class);@b@  private Configuration config;@b@@b@  public MemoryBasedScheduler(Configuration config)@b@    throws Exception@b@  {@b@    this.config = config;@b@  }@b@@b@  private Configuration getConfiguration()@b@  {@b@    return this.config;@b@  }@b@@b@  public void start()@b@  {@b@    try@b@    {@b@      int i;@b@      super.start();@b@      this.scheduleQueue = new JobQueue();@b@      this.mainLoop = new AbstractScheduler.MainLoop(this);@b@@b@      List jobProps = getConfiguration().getList("jobs");@b@      List jobs = new ArrayList();@b@@b@      if (!(jobProps.isEmpty()))@b@      {@b@        for (i = 0; i < jobProps.size(); ++i)@b@        {@b@          String jobName = (String)jobProps.get(i);@b@          String jobPrefix = "job." + jobName;@b@@b@          if (getConfiguration().getString(jobPrefix + ".ID", null) == null)@b@          {@b@            throw new Exception("There is an error in the properties file. \n" + jobPrefix + ".ID is not found.\n");@b@          }@b@@b@          int sec = getConfiguration().getInt(jobPrefix + ".SECOND", -1);@b@          int min = getConfiguration().getInt(jobPrefix + ".MINUTE", -1);@b@          int hr = getConfiguration().getInt(jobPrefix + ".HOUR", -1);@b@          int wkday = getConfiguration().getInt(jobPrefix + ".WEEKDAY", -1);@b@          int dayOfMonth = getConfiguration().getInt(jobPrefix + ".DAY_OF_MONTH", -1);@b@@b@          JobEntry je = new JobEntry(sec, min, hr, wkday, dayOfMonth, jobName);@b@@b@          jobs.add(je);@b@        }@b@@b@      }@b@@b@      if ((jobs != null) && (jobs.size() > 0))@b@      {@b@        this.scheduleQueue.batchLoad(jobs);@b@        restart();@b@      }@b@@b@    }@b@    catch (Exception e)@b@    {@b@      log.error("Cannot initialize SchedulerService!: ", e);@b@    }@b@  }@b@@b@  public void stop()@b@  {@b@    super.stop();@b@  }@b@@b@  public JobEntry getJob(int oid)@b@    throws Exception@b@  {@b@    JobEntry je = new JobEntry(-1, -1, -1, -1, -1, null);@b@@b@    return this.scheduleQueue.getJob(je);@b@  }@b@@b@  public void addJob(JobEntry je)@b@    throws Exception@b@  {@b@    this.scheduleQueue.add(je);@b@    restart();@b@  }@b@@b@  public void removeJob(JobEntry je)@b@    throws Exception@b@  {@b@    this.scheduleQueue.remove(je);@b@    restart();@b@  }@b@@b@  public void updateJob(JobEntry je)@b@    throws Exception@b@  {@b@    try@b@    {@b@      je.calcRunTime();@b@    }@b@    catch (Exception e)@b@    {@b@      log.error("Problem updating Scheduled Job: " + e);@b@    }@b@@b@    this.scheduleQueue.modify(je);@b@    restart();@b@  }@b@}

2.任务定义实现类

package org.apache.jetspeed.scheduler;@b@@b@public abstract class BaseJobEntry@b@{@b@  protected int jobId;@b@  protected int jobSecond;@b@  protected int jobMinute;@b@  protected int jobHour;@b@  protected int weekDay;@b@  protected int dayOfMonth;@b@  protected String task;@b@  protected String email;@b@@b@  public BaseJobEntry()@b@  {@b@    this.jobId = 0;@b@    this.jobSecond = -1;@b@    this.jobMinute = -1;@b@    this.jobHour = -1;@b@    this.weekDay = -1;@b@    this.dayOfMonth = -1;@b@  }@b@@b@  public int getJobId()@b@  {@b@    return this.jobId;@b@  }@b@@b@  public void setJobId(int v)@b@  {@b@    this.jobId = v;@b@  }@b@@b@  public int getSecond()@b@  {@b@    return this.jobSecond;@b@  }@b@@b@  public void setSecond(int v)@b@  {@b@    this.jobSecond = v;@b@  }@b@@b@  public int getMinute()@b@  {@b@    return this.jobMinute;@b@  }@b@@b@  public void setMinute(int v)@b@  {@b@    this.jobMinute = v;@b@  }@b@@b@  public int getHour()@b@  {@b@    return this.jobHour;@b@  }@b@@b@  public void setHour(int v)@b@  {@b@    this.jobHour = v;@b@  }@b@@b@  public int getWeekDay()@b@  {@b@    return this.weekDay;@b@  }@b@@b@  public void setWeekDay(int v)@b@  {@b@    this.weekDay = v;@b@  }@b@@b@  public int getDayOfMonth()@b@  {@b@    return this.dayOfMonth;@b@  }@b@@b@  public void setDayOfMonth(int v)@b@  {@b@    this.dayOfMonth = v;@b@  }@b@@b@  public String getTask()@b@  {@b@    return this.task;@b@  }@b@@b@  public void setTask(String v)@b@  {@b@    this.task = v;@b@  }@b@@b@  public String getEmail()@b@  {@b@    return this.email;@b@  }@b@@b@  public void setEmail(String v)@b@  {@b@    this.email = v;@b@  }@b@}
package org.apache.jetspeed.scheduler;@b@@b@import java.util.Calendar;@b@import java.util.Date;@b@@b@public class JobEntry extends BaseJobEntry@b@  implements Comparable@b@{@b@  private boolean jobIsActive = false;@b@  private long runtime = 0L;@b@  private static final int SECOND = 0;@b@  private static final int MINUTE = 1;@b@  private static final int WEEK_DAY = 2;@b@  private static final int DAY_OF_MONTH = 3;@b@  private static final int DAILY = 4;@b@@b@  public JobEntry()@b@  {@b@  }@b@@b@  public JobEntry(int sec, int min, int hour, int wd, int day_mo, String task)@b@    throws Exception@b@  {@b@    if ((task == null) || (task.length() == 0))@b@    {@b@      throw new Exception("Error in JobEntry. Bad Job parameter. Task not set.");@b@    }@b@@b@    setSecond(sec);@b@    setMinute(min);@b@    setHour(hour);@b@    setWeekDay(wd);@b@    setDayOfMonth(day_mo);@b@    setTask(task);@b@@b@    calcRunTime();@b@  }@b@@b@  public int compareTo(Object je)@b@  {@b@    int result = -1;@b@    if (je instanceof JobEntry)@b@    {@b@      if (this.jobId == ((JobEntry)je).getJobId())@b@      {@b@        return 0;@b@      }@b@@b@      if (this.jobId > ((JobEntry)je).getJobId())@b@      {@b@        return 1;@b@      }@b@@b@    }@b@@b@    return result;@b@  }@b@@b@  public void setActive(boolean isActive)@b@  {@b@    this.jobIsActive = isActive;@b@  }@b@@b@  public boolean isActive()@b@  {@b@    return this.jobIsActive;@b@  }@b@@b@  public long getNextRuntime()@b@  {@b@    return this.runtime;@b@  }@b@@b@  public String getNextRunAsString()@b@  {@b@    return new Date(this.runtime).toString();@b@  }@b@@b@  public void calcRunTime()@b@    throws Exception@b@  {@b@    Calendar schedrun = Calendar.getInstance();@b@    Calendar now = Calendar.getInstance();@b@@b@    switch (evaluateJobType())@b@    {@b@    case 0:@b@      schedrun.add(13, getSecond());@b@      this.runtime = schedrun.getTime().getTime();@b@      break;@b@    case 1:@b@      schedrun.add(13, getSecond());@b@      schedrun.add(12, getMinute());@b@      this.runtime = schedrun.getTime().getTime();@b@      break;@b@    case 2:@b@      schedrun.set(13, getSecond());@b@      schedrun.set(12, getMinute());@b@      schedrun.set(11, getHour());@b@      schedrun.set(7, getWeekDay());@b@@b@      if (now.before(schedrun))@b@      {@b@        this.runtime = schedrun.getTime().getTime(); return;@b@      }@b@@b@      schedrun.add(7, 7);@b@      this.runtime = schedrun.getTime().getTime();@b@@b@      break;@b@    case 3:@b@      schedrun.set(13, getSecond());@b@      schedrun.set(12, getMinute());@b@      schedrun.set(11, getHour());@b@      schedrun.set(5, getDayOfMonth());@b@@b@      if (now.before(schedrun))@b@      {@b@        this.runtime = schedrun.getTime().getTime(); return;@b@      }@b@@b@      schedrun.add(2, 1);@b@      this.runtime = schedrun.getTime().getTime();@b@@b@      break;@b@    case 4:@b@      schedrun.set(13, getSecond());@b@      schedrun.set(12, getMinute());@b@      schedrun.set(11, getHour());@b@@b@      if (now.before(schedrun))@b@      {@b@        this.runtime = schedrun.getTime().getTime(); return;@b@      }@b@@b@      schedrun.add(11, 24);@b@      this.runtime = schedrun.getTime().getTime();@b@    }@b@  }@b@@b@  private int evaluateJobType()@b@    throws Exception@b@  {@b@    if (getDayOfMonth() < 0)@b@    {@b@      if (getWeekDay() < 0)@b@      {@b@        if (getHour() < 0)@b@        {@b@          if (getMinute() < 0)@b@          {@b@            if (getSecond() < 0)@b@              throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@            return 0;@b@          }@b@@b@          if ((getMinute() < 0) || (getSecond() < 0))@b@            throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@          return 1;@b@        }@b@@b@        if ((getMinute() < 0) || (getHour() < 0) || (getSecond() < 0))@b@          throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@        return 4;@b@      }@b@@b@      if ((getMinute() < 0) || (getHour() < 0) || (getSecond() < 0))@b@        throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@      return 2;@b@    }@b@@b@    if ((getMinute() < 0) || (getHour() < 0))@b@      throw new Exception("Error in JobEntry. Bad Job parameter.");@b@@b@    return 3;@b@  }@b@}
package org.apache.jetspeed.scheduler;@b@@b@import java.util.Collections;@b@import java.util.Comparator;@b@import java.util.List;@b@import java.util.Vector;@b@@b@public class JobQueue@b@{@b@  private Vector queue = null;@b@@b@  public JobQueue()@b@    throws Exception@b@  {@b@    this.queue = new Vector(10);@b@  }@b@@b@  public JobEntry getNext()@b@  {@b@    if (this.queue.size() > 0)@b@    {@b@      return ((JobEntry)this.queue.elementAt(0));@b@    }@b@@b@    return null;@b@  }@b@@b@  public JobEntry getJob(JobEntry je)@b@  {@b@    int index = -1;@b@@b@    if (je != null)@b@    {@b@      index = this.queue.indexOf(je);@b@    }@b@@b@    if (index < 0)@b@    {@b@      return null;@b@    }@b@@b@    return ((JobEntry)this.queue.elementAt(index));@b@  }@b@@b@  public Vector list()@b@  {@b@    if ((this.queue != null) && (this.queue.size() > 0))@b@    {@b@      return ((Vector)this.queue.clone());@b@    }@b@@b@    return null;@b@  }@b@@b@  public synchronized void add(JobEntry je)@b@  {@b@    this.queue.addElement(je);@b@    sortQueue();@b@  }@b@@b@  public synchronized void batchLoad(List jobEntries)@b@  {@b@    if (jobEntries != null)@b@    {@b@      this.queue.addAll(jobEntries);@b@      sortQueue();@b@    }@b@  }@b@@b@  public synchronized void remove(JobEntry je)@b@  {@b@    this.queue.removeElement(je);@b@    sortQueue();@b@  }@b@@b@  public synchronized void modify(JobEntry je)@b@  {@b@    sortQueue();@b@  }@b@@b@  public synchronized void updateQueue(JobEntry je)@b@    throws Exception@b@  {@b@    je.calcRunTime();@b@    sortQueue();@b@  }@b@@b@  private void sortQueue()@b@  {@b@    Comparator aComparator = new Comparator(this) {@b@      private final JobQueue this$0;@b@@b@      public int compare(, Object o2) {@b@        Long time1 = new Long(((JobEntry)o1).getNextRuntime());@b@        Long time2 = new Long(((JobEntry)o2).getNextRuntime());@b@        return time1.compareTo(time2);@b@      }@b@@b@    };@b@    Collections.sort(this.queue, aComparator);@b@  }@b@}
package org.apache.jetspeed.scheduler;@b@@b@public abstract class ScheduledJob@b@{@b@  public abstract void run(JobEntry paramJobEntry)@b@    throws Exception;@b@@b@  public void execute(JobEntry job)@b@    throws Exception@b@  {@b@    run(job);@b@  }@b@}

3. 调度任务线程引擎类

package org.apache.jetspeed.scheduler;@b@@b@public class WorkerThread@b@  implements Runnable@b@{@b@  private JobEntry je = null;@b@@b@  public WorkerThread(JobEntry je)@b@  {@b@    this.je = je;@b@  }@b@@b@  public void run()@b@  {@b@    if ((this.je == null) || (this.je.isActive()))@b@    {@b@      return;@b@    }@b@@b@    try@b@    {@b@      if (!(this.je.isActive()))@b@      {@b@        this.je.setActive(true);@b@        logStateChange("started");@b@@b@        String className = this.je.getTask();@b@@b@        ScheduledJob sc = (ScheduledJob)Class.forName(className).newInstance();@b@        sc.execute(this.je);@b@      }@b@@b@    }@b@    catch (Exception e)@b@    {@b@    }@b@    finally@b@    {@b@      if (this.je.isActive())@b@      {@b@        this.je.setActive(false);@b@        logStateChange("completed");@b@      }@b@    }@b@  }@b@@b@  private final void logStateChange(String state)@b@  {@b@  }@b@}
<<热门下载>>