| 1 |
public class SchedulerServiceBean implements SchedulerService { private static final String SCHEDULER = "SY_SCHEDULER"; private static final Logger logger = LoggerFactory .getLogger(SchedulerServiceBean.class); private Scheduler sched; @PostConstruct void doStart() { try { sched = new StdSchedulerFactory( "ru/deltasolutions/common/switchyard/scheduler/quartz.properties") .getScheduler(); sched.start(); logger.info("Scheduler started (" + this.hashCode() + ")"); } catch (Throwable t) { logger.error("Cannot instantiate scheduler", t); } } @PreDestroy void doStop() { try { logger.info("Stopping scheduler, waiting for jobs to be completed..."); sched.shutdown(true); logger.info("Scheduler stopped"); } catch (Throwable t) { logger.error("Cannot stop scheduler", t); } } @Override public IdResponse scheduleJob(ScheduledJobRequest request) { IdResponse response = new IdResponse(); String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); logger.info("[{}][{}] scheduleJob() started", transactionId, initiator); logger.debug("[{}][{}] scheduleJob() started with request={}", transactionId, initiator, request); ScheduledJob job = request.getJob(); String jobId = UUID.randomUUID().toString(); job.setId(jobId); job.setCreationDate(new Date(System.currentTimeMillis())); JobDataMap jdm = getJdmFromJob(job); jdm.put(JdmKey.TRANSACTION_ID, transactionId); jdm.put(JdmKey.INITIATOR, initiator); try { String jobGroup = job.getSystem(); JobDetail jobDetail = JobBuilder.newJob(HttpClientJob.class) .withIdentity(jobId, jobGroup).usingJobData(jdm).build(); String schedType = job.getType(); Trigger trigger = null; if (schedType.equals(ScheduleType.PERIODIC)) { // cron trigger trigger = buildCronTrigger(job); } else if (schedType.equals(ScheduleType.SINGLE)) { // simple trigger trigger = buildSimpleTrigger(job); } else { throw new Exception("Wrong schedule type"); } sched.scheduleJob(jobDetail, trigger); response.setId(jobId); response.setErrorCode("OK"); return response; } catch (ParseException e) { logger.error("[{}][{}] Got parse exception while in scheduleJob()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-003"); response.setErrorMessage("Parse error: " + e.getMessage()); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler exception while in scheduleJob()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (Exception e) { logger.error("[{}][{}] Got exception while in scheduleJob()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } finally { logger.info( "[{}][{}] scheduleJob() finished with errorCode={} and errorMessage={}", transactionId, initiator, response.getErrorCode(), response.getErrorMessage()); logger.debug("[{}][{}] scheduleJob() finished with response={}", transactionId, initiator, response); } } @Override public ScheduledJobResponse getJobByIdAndSystem(JobKeyRequest request) { String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); logger.info("[{}][{}] getJobById() started", transactionId, initiator); logger.debug("[{}][{}] getJobById() started with request={}", transactionId, initiator, request); ScheduledJobResponse response = new ScheduledJobResponse(); JobKey key = new JobKey(request.getId(), request.getSystem()); try { JobDetail jd = sched.getJobDetail(key); if (jd != null) { JobDataMap jdm = jd.getJobDataMap(); ScheduledJob job = getJobFromJdm(jdm); response.setJob(job); } response.setErrorCode("OK"); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler exception while in getJobById()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (Exception e) { logger.error("[{}][{}] Got exception while in getJobById()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } } @Override public BaseResponse unscheduleJob(JobKeyRequest request) { String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); logger.info("[{}][{}] unscheduleJob() started", transactionId, initiator); logger.debug("[{}][{}] unscheduleJob() started with request={}", transactionId, initiator, request); BaseResponse response = new BaseResponse(); JobKey key = new JobKey(request.getId(), request.getSystem()); try { sched.deleteJob(key); response.setErrorCode("OK"); response.setErrorMessage(null); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler internal error while in unscheduleJob()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (Exception e) { logger.error("[{}][{}] Got exception while in unscheduleJob()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } finally { logger.info( "[{}][{}] unscheduleJob() finished with errorCode={} and errorMessage={}", transactionId, initiator, response.getErrorCode(), response.getErrorMessage()); logger.debug("[{}][{}] unscheduleJob() finished with response={}", transactionId, initiator, response); } } @SuppressWarnings("unchecked") @Override public BaseResponse updateJob(ScheduledJobRequest request) { BaseResponse response = new BaseResponse(); String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); logger.info("[{}][{}] updateJob() started", transactionId, initiator); logger.debug("[{}][{}] updateJob() started with request={}", transactionId, initiator, request); ScheduledJob newJob = request.getJob(); newJob.setLastUpdDate(new Date(System.currentTimeMillis())); JobKey key = new JobKey(newJob.getId(), newJob.getSystem()); try { // get old job data JobDetail oldJd = sched.getJobDetail(key); JobDataMap oldJdm = oldJd.getJobDataMap(); ScheduledJob oldJob = getJobFromJdm(oldJdm); // update trigger if needed if (isUpdateTrigger(oldJob, newJob)) { newJob.setLastTrigUpdDate(newJob.getLastUpdDate()); List<Trigger> triggers = (List<Trigger>) sched .getTriggersOfJob(key); Trigger oldTrigger = triggers.get(0);// job has only one trigger String newSchedType = newJob.getType(); Trigger newTrigger = null; if (newSchedType.equals(ScheduleType.PERIODIC)) { newTrigger = buildCronTrigger(newJob); } else if (newSchedType.equals(ScheduleType.SINGLE)) { newTrigger = buildSimpleTrigger(newJob); } sched.rescheduleJob(oldTrigger.getKey(), newTrigger); } // update job - replace the old one JobDataMap newJdm = getJdmFromJob(newJob); JobDetail newJd = JobBuilder.newJob(HttpClientJob.class) .withIdentity(key).usingJobData(newJdm).build(); sched.addJob(newJd, true, true); response.setErrorCode("OK"); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler internal error while in updateJob()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (ParseException e) { logger.error("[{}][{}] Got parse exception while in updateJob()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-003"); response.setErrorMessage("Parse error: " + e.getMessage()); return response; } catch (Exception e) { logger.error("[{}][{}] Got exception while in updateJob()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } finally { logger.info( "[{}][{}] updateJob() finished with errorCode={} and errorMessage={}", transactionId, initiator, response.getErrorCode(), response.getErrorMessage()); logger.debug("[{}][{}] updateJob() finished with response={}", transactionId, initiator, response); } } @Override public JobListResponse getAllJobs(BaseRequest request) { String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); logger.info("[{}][{}] getAllJobs() started", transactionId, initiator); logger.debug("[{}][{}] getAllJobs() started with request={}", transactionId, initiator, request); JobListResponse response = new JobListResponse(); List<ScheduledJob> jobs = new ArrayList<ScheduledJob>(); try { if (sched.getJobGroupNames() != null && !sched.getJobGroupNames().isEmpty()) { for (String group : sched.getJobGroupNames()) { for (JobKey key : sched.getJobKeys(GroupMatcher .jobGroupEquals(group))) { JobDetail jd = sched.getJobDetail(key); JobDataMap jdm = jd.getJobDataMap(); ScheduledJob job = getJobFromJdm(jdm); jobs.add(job); } } } response.setJobs(jobs); response.setErrorCode("OK"); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler internal error while in getAllJobs()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (Exception e) { logger.error("[{}][{}] Got exception while in getAllJobs()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } finally { logger.info( "[{}][{}] getAllJobs() finished with errorCode={} and errorMessage={}", transactionId, initiator, response.getErrorCode(), response.getErrorMessage()); logger.debug("[{}][{}] getAllJobs() finished with response={}", transactionId, initiator, response); } } @Override public JobListResponse getAllJobsForSystem(SystemRequest request) { String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); logger.info("[{}][{}] getAllJobsForSystem() started", transactionId, initiator); logger.debug("[{}][{}] getAllJobsForSystem() started with request={}", transactionId, initiator, request); JobListResponse response = new JobListResponse(); List<ScheduledJob> jobs = new ArrayList<ScheduledJob>(); String group = request.getSystem(); try { for (JobKey key : sched.getJobKeys(GroupMatcher .jobGroupEquals(group))) { JobDetail jd = sched.getJobDetail(key); JobDataMap jdm = jd.getJobDataMap(); ScheduledJob job = getJobFromJdm(jdm); jobs.add(job); } response.setJobs(jobs); response.setErrorCode("OK"); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler internal error while in getAllJobsForSystem()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (Exception e) { logger.error( "[{}][{}] Got exception while in getAllJobsForSystem()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } finally { logger.info( "[{}][{}] getAllJobsForSystem() finished with errorCode={} and errorMessage={}", transactionId, initiator, response.getErrorCode(), response.getErrorMessage()); logger.debug( "[{}][{}] getAllJobsForSystem() finished with response={}", transactionId, initiator, response); } } @Override public JobListResponsePaged getAllJobsForSystemPaged(PagedRequest request) { String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); Integer startingAt = request.getStartingAt(); Integer count = request.getCount(); logger.info("[{}][{}] getAllJobsForSystemPaged() started", transactionId, initiator); logger.debug( "[{}][{}] getAllJobsForSystemPaged() started with request={}", transactionId, initiator, request); JobListResponsePaged response = new JobListResponsePaged(); List<ScheduledJob> jobs = new ArrayList<ScheduledJob>(); String group = request.getSystem(); try { Set<JobKey> keys = sched.getJobKeys(GroupMatcher .jobGroupEquals(group)); response.setCount(keys.size()); int i = 0; for (JobKey key : keys) { if (i >= startingAt && i < startingAt + count) { JobDetail jd = sched.getJobDetail(key); JobDataMap jdm = jd.getJobDataMap(); ScheduledJob job = getJobFromJdm(jdm); jobs.add(job); } i++; } response.setJobs(jobs); response.setErrorCode("OK"); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler internal error while in getAllJobsForSystemPaged()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (Exception e) { logger.error( "[{}][{}] Got exception while in getAllJobsForSystemPaged()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } finally { logger.info( "[{}][{}] getAllJobsForSystemPaged() finished with errorCode={} and errorMessage={}", transactionId, initiator, response.getErrorCode(), response.getErrorMessage()); logger.debug( "[{}][{}] getAllJobsForSystemPaged() finished with response={}", transactionId, initiator, response); } } @Override public BaseResponse killAllJobsForSystem(SystemRequest request) { String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); logger.info("[{}][{}] killAllJobsForSystem() started", transactionId, initiator); logger.debug("[{}][{}] killAllJobsForSystem() started with request={}", transactionId, initiator, request); BaseResponse response = new BaseResponse(); String group = request.getSystem(); try { for (JobKey key : sched.getJobKeys(GroupMatcher .jobGroupEquals(group))) { sched.deleteJob(key); } response.setErrorCode("OK"); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler internal error while in killAllJobsForSystem()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (Exception e) { logger.error( "[{}][{}] Got exception while in killAllJobsForSystem()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } finally { logger.info( "[{}][{}] killAllJobsForSystem() finished with errorCode={} and errorMessage={}", transactionId, initiator, response.getErrorCode(), response.getErrorMessage()); logger.debug( "[{}][{}] killAllJobsForSystem() finished with response={}", transactionId, initiator, response); } } @Override public BaseResponse killAllJobs(BaseRequest request) { String transactionId = request.getTransactionId(); String initiator = request.getInitiator(); logger.info("[{}][{}] killAllJobs() started", transactionId, initiator); logger.debug("[{}][{}] killAllJobs() started with request={}", transactionId, initiator, request); BaseResponse response = new BaseResponse(); try { for (String group : sched.getJobGroupNames()) { for (JobKey key : sched.getJobKeys(GroupMatcher .jobGroupEquals(group))) { sched.deleteJob(key); } } response.setErrorCode("OK"); return response; } catch (SchedulerException e) { logger.error( "[{}][{}] Got scheduler internal error while in killAllJobs()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-002"); response.setErrorMessage("Scheduler error: " + e.getMessage()); return response; } catch (Exception e) { logger.error("[{}][{}] Got exception while in killAllJobs()", transactionId, initiator, e); response.setErrorCode(SCHEDULER + "-001"); response.setErrorMessage("Internal error: " + e.getMessage()); return response; } finally { logger.info( "[{}][{}] killAllJobs() finished with errorCode={} and errorMessage={}", transactionId, initiator, response.getErrorCode(), response.getErrorMessage()); logger.debug("[{}][{}] killAllJobs() finished with response={}", transactionId, initiator, response); } } private JobDataMap getJdmFromJob(ScheduledJob job) { JobDataMap jdm = new JobDataMap(); jdm.put(JdmKey.ID, job.getId()); jdm.put(JdmKey.TYPE, job.getType()); jdm.put(JdmKey.CRON, job.getCron()); jdm.put(JdmKey.MILLIS, job.getMillis()); jdm.put(JdmKey.PATH, job.getPath()); jdm.put(JdmKey.JSON, job.getJson()); jdm.put(JdmKey.SYSTEM, job.getSystem()); jdm.put(JdmKey.DISPLAY_NAME, job.getDisplayName()); jdm.put(JdmKey.DESCRIPTION, job.getDescription()); jdm.put(JdmKey.AUTHOR, job.getAuthor()); jdm.put(JdmKey.CREATION_DATE, job.getCreationDate()); jdm.put(JdmKey.LAST_UPD_DATE, job.getLastUpdDate()); jdm.put(JdmKey.LAST_TRIG_UPD_DATE, job.getLastTrigUpdDate()); return jdm; } private ScheduledJob getJobFromJdm(JobDataMap jdm) { ScheduledJob job = new ScheduledJob(); job.setId(jdm.getString(JdmKey.ID)); job.setType(jdm.getString(JdmKey.TYPE)); job.setCron(jdm.getString(JdmKey.CRON)); job.setMillis(jdm.getString(JdmKey.MILLIS)); job.setPath(jdm.getString(JdmKey.PATH)); job.setJson(jdm.getString(JdmKey.JSON)); job.setSystem(jdm.getString(JdmKey.SYSTEM)); job.setDisplayName(jdm.getString(JdmKey.DISPLAY_NAME)); job.setDescription(jdm.getString(JdmKey.DESCRIPTION)); job.setAuthor(jdm.getString(JdmKey.AUTHOR)); job.setCreationDate((Date) jdm.get(JdmKey.CREATION_DATE)); job.setLastUpdDate((Date) jdm.get(JdmKey.LAST_UPD_DATE)); job.setLastTrigUpdDate((Date) jdm.get(JdmKey.LAST_TRIG_UPD_DATE)); return job; } private Trigger buildCronTrigger(ScheduledJob job) throws ParseException { Trigger trigger = null; String identity = "Cron trigger-" + job.getId() + "_" + System.currentTimeMillis(); String group = job.getSystem(); CronExpression cron = new CronExpression(job.getCron()); trigger = TriggerBuilder.newTrigger().withIdentity(identity, group) .startNow() .withSchedule(CronScheduleBuilder.cronSchedule(cron)).build(); return trigger; } private Trigger buildSimpleTrigger(ScheduledJob job) { Trigger trigger = null; String identity = "Simple trigger-" + job.getId() + "_" + System.currentTimeMillis(); String group = job.getSystem(); Long millis = Long.parseLong(job.getMillis()); Long trigTime = (job.getLastTrigUpdDate() == null) ? job .getCreationDate().getTime() : job.getLastTrigUpdDate() .getTime(); Date start = new Date(trigTime + millis); trigger = TriggerBuilder.newTrigger().withIdentity(identity, group) .startAt(start).build(); return trigger; } private boolean isUpdateTrigger(ScheduledJob oldJob, ScheduledJob newJob) { String oldSchedType = oldJob.getType(); String newSchedType = newJob.getType(); boolean result = false; if (oldSchedType.equals(ScheduleType.PERIODIC) && newSchedType.equals(ScheduleType.SINGLE)) { result = true; } else if (oldSchedType.equals(ScheduleType.SINGLE) && newSchedType.equals(ScheduleType.PERIODIC)) { result = true; } else if (oldSchedType.equals(ScheduleType.PERIODIC) && newSchedType.equals(ScheduleType.PERIODIC)) { String oldCron = oldJob.getCron(); String newCron = newJob.getCron(); result = (!oldCron.equals(newCron)) ? true : false; return result; } else if (oldSchedType.equals(ScheduleType.SINGLE) && newSchedType.equals(ScheduleType.SINGLE)) { String oldMillis = oldJob.getMillis(); String newMillis = newJob.getMillis(); result = (!oldMillis.equals(newMillis)) ? true : false; return result; } return result; } } |
Комментарии