|
Author: adrianc
Date: Sat Aug 4 08:57:19 2012 New Revision: 1369291 URL: http://svn.apache.org/viewvc?rev=1369291&view=rev Log: JobManager.java and JobPoller.java code formatting - no functional change. Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1369291&r1=1369290&r2=1369291&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Aug 4 08:57:19 2012 @@ -56,12 +56,48 @@ import org.ofbiz.service.config.ServiceC */ public class JobManager { - public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED"); public static final String module = JobManager.class.getName(); - + public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); + public static final Map<String, Object> updateFields = UtilMisc.<String, Object> toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED"); private static final Map<String, JobManager> registeredManagers = new HashMap<String, JobManager>(); + public synchronized static JobManager getInstance(Delegator delegator, boolean enabled) { + JobManager jm = JobManager.registeredManagers.get(delegator.getDelegatorName()); + if (jm == null) { + jm = new JobManager(delegator, enabled); + JobManager.registeredManagers.put(delegator.getDelegatorName(), jm); + } + return jm; + } + + /** gets the recurrence info object for a job. */ + public static RecurrenceInfo getRecurrenceInfo(GenericValue job) { + try { + if (job != null && !UtilValidate.isEmpty(job.getString("recurrenceInfoId"))) { + if (job.get("cancelDateTime") != null) { + // cancel has been flagged, no more recurrence + return null; + } + GenericValue ri = job.getRelatedOne("RecurrenceInfo", false); + + if (ri != null) { + return new RecurrenceInfo(ri); + } else { + return null; + } + } else { + return null; + } + } catch (GenericEntityException e) { + e.printStackTrace(); + Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module); + } catch (RecurrenceInfoException re) { + re.printStackTrace(); + Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module); + } + return null; + } + protected Delegator delegator; protected JobPoller jp; @@ -73,20 +109,15 @@ public class JobManager { jp = new JobPoller(this, enabled); } - public synchronized static JobManager getInstance(Delegator delegator, boolean enabled) { - JobManager jm = JobManager.registeredManagers.get(delegator.getDelegatorName()); - if (jm == null) { - jm = new JobManager(delegator, enabled); - JobManager.registeredManagers.put(delegator.getDelegatorName(), jm); - } - return jm; + @Override + public void finalize() throws Throwable { + this.shutdown(); + super.finalize(); } - /** Queues a Job to run now. */ - public void runJob(Job job) throws JobManagerException { - if (job.isValid()) { - jp.queueNow(job); - } + /** Returns the Delegator. */ + public Delegator getDelegator() { + return this.delegator; } /** Returns the LocalDispatcher. */ @@ -95,61 +126,53 @@ public class JobManager { return thisDispatcher; } - /** Returns the Delegator. */ - public Delegator getDelegator() { - return this.delegator; + /** + * Get a List of each threads current state. + * + * @return List containing a Map of each thread's state. + */ + public Map<String, Object> getPoolState() { + return jp.getPoolState(); } public synchronized List<Job> poll() { List<Job> poll = FastList.newInstance(); - // sort the results by time List<String> order = UtilMisc.toList("runTime"); - // basic query - List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, - UtilDateTime.nowTimestamp()), EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null), - EntityCondition.makeCondition("cancelDateTime", EntityOperator.EQUALS, null), - EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, null)); - + List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()), + EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null), EntityCondition.makeCondition("cancelDateTime", + EntityOperator.EQUALS, null), EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, null)); // limit to just defined pools List<String> pools = ServiceConfigUtil.getRunPools(); List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); if (pools != null) { - for (String poolName: pools) { + for (String poolName : pools) { poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName)); } } - // make the conditions EntityCondition baseCondition = EntityCondition.makeCondition(expressions); EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR); EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition)); - // we will loop until we have no more to do boolean pollDone = false; - while (!pollDone) { boolean beganTransaction = false; - try { beganTransaction = TransactionUtil.begin(); if (!beganTransaction) { Debug.logError("Unable to poll for jobs; transaction was not started by this process", module); return null; } - List<Job> localPoll = FastList.newInstance(); - // first update the jobs w/ this instance running information delegator.storeByCondition("JobSandbox", updateFields, mainCondition); - // now query all the 'queued' jobs for this instance List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false); - //jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order); - + // jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order); if (UtilValidate.isNotEmpty(jobEnt)) { - for (GenericValue v: jobEnt) { + for (GenericValue v : jobEnt) { DispatchContext dctx = getDispatcher().getDispatchContext(); if (dctx == null) { Debug.logError("Unable to locate DispatchContext object; not running job!", module); @@ -166,7 +189,6 @@ public class JobManager { } else { pollDone = true; } - // nothing should go wrong at this point, so add to the general list poll.addAll(localPoll); } catch (Throwable t) { @@ -196,21 +218,18 @@ public class JobManager { public synchronized void reloadCrashedJobs() { String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); List<GenericValue> crashed = null; - List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId)); exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs); - try { crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false); } catch (GenericEntityException e) { Debug.logError(e, "Unable to load crashed jobs", module); } - if (UtilValidate.isNotEmpty(crashed)) { try { int rescheduled = 0; - for (GenericValue job: crashed) { + for (GenericValue job : crashed) { Timestamp now = UtilDateTime.nowTimestamp(); Debug.log("Scheduling Job : " + job, module); @@ -234,25 +253,39 @@ public class JobManager { rescheduled++; } - - if (Debug.infoOn()) Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); + if (Debug.infoOn()) + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); } catch (GenericEntityException e) { Debug.logError(e, module); } - } else { - if (Debug.infoOn()) Debug.logInfo("No crashed jobs to re-schedule", module); + if (Debug.infoOn()) + Debug.logInfo("No crashed jobs to re-schedule", module); + } + } + + /** Queues a Job to run now. */ + public void runJob(Job job) throws JobManagerException { + if (job.isValid()) { + jp.queueNow(job); } } /** * Schedule a job to start at a specific time with specific recurrence info - *@param serviceName The name of the service to invoke - *@param context The context for the service - *@param startTime The time in milliseconds the service should run - *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) - *@param interval The interval of the frequency recurrence - *@param count The number of times to repeat + * + * @param serviceName + * The name of the service to invoke + *@param context + * The context for the service + *@param startTime + * The time in milliseconds the service should run + *@param frequency + * The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) + *@param interval + * The interval of the frequency recurrence + *@param count + * The number of times to repeat */ public void schedule(String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count) throws JobManagerException { schedule(serviceName, context, startTime, frequency, interval, count, 0); @@ -260,12 +293,41 @@ public class JobManager { /** * Schedule a job to start at a specific time with specific recurrence info - *@param serviceName The name of the service to invoke - *@param context The context for the service - *@param startTime The time in milliseconds the service should run - *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) - *@param interval The interval of the frequency recurrence - *@param endTime The time in milliseconds the service should expire + * + * @param serviceName + * The name of the service to invoke + *@param context + * The context for the service + *@param startTime + * The time in milliseconds the service should run + *@param frequency + * The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) + *@param interval + * The interval of the frequency recurrence + *@param count + * The number of times to repeat + *@param endTime + * The time in milliseconds the service should expire + */ + public void schedule(String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException { + schedule(null, serviceName, context, startTime, frequency, interval, count, endTime); + } + + /** + * Schedule a job to start at a specific time with specific recurrence info + * + * @param serviceName + * The name of the service to invoke + *@param context + * The context for the service + *@param startTime + * The time in milliseconds the service should run + *@param frequency + * The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) + *@param interval + * The interval of the frequency recurrence + *@param endTime + * The time in milliseconds the service should expire */ public void schedule(String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, long endTime) throws JobManagerException { schedule(serviceName, context, startTime, frequency, interval, -1, endTime); @@ -273,52 +335,75 @@ public class JobManager { /** * Schedule a job to start at a specific time with specific recurrence info - *@param serviceName The name of the service to invoke - *@param context The context for the service - *@param startTime The time in milliseconds the service should run - *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) - *@param interval The interval of the frequency recurrence - *@param count The number of times to repeat - *@param endTime The time in milliseconds the service should expire + * + * @param poolName + * The name of the pool to run the service from + *@param serviceName + * The name of the service to invoke + *@param context + * The context for the service + *@param startTime + * The time in milliseconds the service should run + *@param frequency + * The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) + *@param interval + * The interval of the frequency recurrence + *@param count + * The number of times to repeat + *@param endTime + * The time in milliseconds the service should expire */ - public void schedule(String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException { - schedule(null, serviceName, context, startTime, frequency, interval, count, endTime); + public void schedule(String poolName, String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, + int interval, int count, long endTime) throws JobManagerException { + schedule(null, null, serviceName, context, startTime, frequency, interval, count, endTime, -1); } /** * Schedule a job to start at a specific time with specific recurrence info - *@param poolName The name of the pool to run the service from - *@param serviceName The name of the service to invoke - *@param context The context for the service - *@param startTime The time in milliseconds the service should run - *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) - *@param interval The interval of the frequency recurrence - *@param count The number of times to repeat - *@param endTime The time in milliseconds the service should expire + * + * @param poolName + * The name of the pool to run the service from + *@param serviceName + * The name of the service to invoke + *@param dataId + * The persisted context (RuntimeData.runtimeDataId) + *@param startTime + * The time in milliseconds the service should run */ - public void schedule(String poolName, String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count, long endTime) throws JobManagerException { - schedule(null, null, serviceName, context, startTime, frequency, interval, count, endTime, -1); + public void schedule(String poolName, String serviceName, String dataId, long startTime) throws JobManagerException { + schedule(null, poolName, serviceName, dataId, startTime, -1, 0, 1, 0, -1); } /** * Schedule a job to start at a specific time with specific recurrence info - *@param jobName The name of the job - *@param poolName The name of the pool to run the service from - *@param serviceName The name of the service to invoke - *@param context The context for the service - *@param startTime The time in milliseconds the service should run - *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) - *@param interval The interval of the frequency recurrence - *@param count The number of times to repeat - *@param endTime The time in milliseconds the service should expire - *@param maxRetry The max number of retries on failure (-1 for no max) + * + * @param jobName + * The name of the job + *@param poolName + * The name of the pool to run the service from + *@param serviceName + * The name of the service to invoke + *@param context + * The context for the service + *@param startTime + * The time in milliseconds the service should run + *@param frequency + * The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) + *@param interval + * The interval of the frequency recurrence + *@param count + * The number of times to repeat + *@param endTime + * The time in milliseconds the service should expire + *@param maxRetry + * The max number of retries on failure (-1 for no max) */ - public void schedule(String jobName, String poolName, String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException { + public void schedule(String jobName, String poolName, String serviceName, Map<String, ? extends Object> context, long startTime, + int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException { if (delegator == null) { Debug.logWarning("No delegator referenced; cannot schedule job.", module); return; } - // persist the context String dataId = null; try { @@ -333,41 +418,40 @@ public class JobManager { } catch (IOException ioe) { throw new JobManagerException(ioe.getMessage(), ioe); } - // schedule the job schedule(jobName, poolName, serviceName, dataId, startTime, frequency, interval, count, endTime, maxRetry); } /** * Schedule a job to start at a specific time with specific recurrence info - *@param poolName The name of the pool to run the service from - *@param serviceName The name of the service to invoke - *@param dataId The persisted context (RuntimeData.runtimeDataId) - *@param startTime The time in milliseconds the service should run - */ - public void schedule(String poolName, String serviceName, String dataId, long startTime) throws JobManagerException { - schedule(null, poolName, serviceName, dataId, startTime, -1, 0, 1, 0, -1); - } - - /** - * Schedule a job to start at a specific time with specific recurrence info - *@param jobName The name of the job - *@param poolName The name of the pool to run the service from - *@param serviceName The name of the service to invoke - *@param dataId The persisted context (RuntimeData.runtimeDataId) - *@param startTime The time in milliseconds the service should run - *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) - *@param interval The interval of the frequency recurrence - *@param count The number of times to repeat - *@param endTime The time in milliseconds the service should expire - *@param maxRetry The max number of retries on failure (-1 for no max) + * + * @param jobName + * The name of the job + *@param poolName + * The name of the pool to run the service from + *@param serviceName + * The name of the service to invoke + *@param dataId + * The persisted context (RuntimeData.runtimeDataId) + *@param startTime + * The time in milliseconds the service should run + *@param frequency + * The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc) + *@param interval + * The interval of the frequency recurrence + *@param count + * The number of times to repeat + *@param endTime + * The time in milliseconds the service should expire + *@param maxRetry + * The max number of retries on failure (-1 for no max) */ - public void schedule(String jobName, String poolName, String serviceName, String dataId, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException { + public void schedule(String jobName, String poolName, String serviceName, String dataId, long startTime, int frequency, int interval, + int count, long endTime, int maxRetry) throws JobManagerException { if (delegator == null) { Debug.logWarning("No delegator referenced; cannot schedule job.", module); return; } - // create the recurrence String infoId = null; if (frequency > -1 && count != 0) { @@ -378,28 +462,23 @@ public class JobManager { throw new JobManagerException(e.getMessage(), e); } } - // set the persisted fields if (UtilValidate.isEmpty(jobName)) { jobName = Long.toString((new Date().getTime())); } - Map<String, Object> jFields = UtilMisc.<String, Object>toMap("jobName", jobName, "runTime", new java.sql.Timestamp(startTime), + Map<String, Object> jFields = UtilMisc.<String, Object> toMap("jobName", jobName, "runTime", new java.sql.Timestamp(startTime), "serviceName", serviceName, "statusId", "SERVICE_PENDING", "recurrenceInfoId", infoId, "runtimeDataId", dataId); - // set the pool ID if (UtilValidate.isNotEmpty(poolName)) { jFields.put("poolId", poolName); } else { jFields.put("poolId", ServiceConfigUtil.getSendPool()); } - // set the loader name jFields.put("loaderName", delegator.getDelegatorName()); - // set the max retry jFields.put("maxRetry", Long.valueOf(maxRetry)); jFields.put("currentRetryCount", new Long(0)); - // create the value and store GenericValue jobV; try { @@ -410,14 +489,6 @@ public class JobManager { } } - /** - * Get a List of each threads current state. - * @return List containing a Map of each thread's state. - */ - public Map<String, Object> getPoolState() { - return jp.getPoolState(); - } - /** Close out the scheduler thread. */ public void shutdown() { if (jp != null) { @@ -428,38 +499,4 @@ public class JobManager { Debug.logInfo("JobManager stopped.", module); } - @Override - public void finalize() throws Throwable { - this.shutdown(); - super.finalize(); - } - - /** gets the recurrence info object for a job. */ - public static RecurrenceInfo getRecurrenceInfo(GenericValue job) { - try { - if (job != null && !UtilValidate.isEmpty(job.getString("recurrenceInfoId"))) { - if (job.get("cancelDateTime") != null) { - // cancel has been flagged, no more recurrence - return null; - } - GenericValue ri = job.getRelatedOne("RecurrenceInfo", false); - - if (ri != null) { - return new RecurrenceInfo(ri); - } else { - return null; - } - } else { - return null; - } - } catch (GenericEntityException e) { - e.printStackTrace(); - Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module); - } catch (RecurrenceInfoException re) { - re.printStackTrace(); - Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module); - } - return null; - } - } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1369291&r1=1369290&r2=1369291&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Aug 4 08:57:19 2012 @@ -38,7 +38,6 @@ import org.apache.commons.lang.math.Numb public class JobPoller implements Runnable { public static final String module = JobPoller.class.getName(); - public static final int MIN_THREADS = 1; public static final int MAX_THREADS = 15; public static final int POLL_WAIT = 20000; @@ -48,100 +47,34 @@ public class JobPoller implements Runnab private ThreadPoolExecutor executor = null; private String name = null; + protected JobPoller() { + } + /** * Creates a new JobScheduler - * @param jm JobManager associated with this scheduler + * + * @param jm + * JobManager associated with this scheduler */ public JobPoller(JobManager jm, boolean enabled) { - this.name = (jm.getDelegator() != null? jm.getDelegator().getDelegatorName(): "NA"); + this.name = (jm.getDelegator() != null ? jm.getDelegator().getDelegatorName() : "NA"); this.jm = jm; - this.executor = new ThreadPoolExecutor(minThreads(), - maxThreads(), - getTTL(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), - new JobInvokerThreadFactory(this.name), - new ThreadPoolExecutor.AbortPolicy()); - + this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), + new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy()); if (enabled) { // re-load crashed jobs this.jm.reloadCrashedJobs(); - // start the thread only if polling is enabled if (pollEnabled()) { - // create the poller thread Thread thread = new Thread(this, "OFBiz-JobPoller-" + this.name); thread.setDaemon(false); - // start the poller thread.start(); } } } - protected JobPoller() {} - - public synchronized void run() { - try { - // wait 30 seconds before the first poll - java.lang.Thread.sleep(30000); - } catch (InterruptedException e) { - } - while (!executor.isShutdown()) { - try { - // grab a list of jobs to run. - List<Job> pollList = jm.poll(); - //Debug.logInfo("Received poll list from JobManager [" + pollList.size() + "]", module); - - for (Job job : pollList) { - if (job.isValid()) { - queueNow(job); - //Debug.logInfo("Job [" + job.getJobId() + "] is queued", module); - } - } - // NOTE: using sleep instead of wait for stricter locking - java.lang.Thread.sleep(pollWaitTime()); - } catch (InterruptedException e) { - Debug.logError(e, module); - stop(); - } - } - } - - /** - * Adds a job to the RUN queue - */ - public void queueNow(Job job) { - this.executor.execute(new JobInvoker(job)); - } - - /** - * Stops the JobPoller - */ - void stop() { - Debug.logInfo("Shutting down thread pool for JobPoller " + this.name, module); - this.executor.shutdown(); - try { - // Wait 60 seconds for existing tasks to terminate - if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) { - // abrupt shutdown (cancel currently executing tasks) - Debug.logInfo("Attempting abrupt shut down of thread pool for JobPoller " + this.name, module); - this.executor.shutdownNow(); - // Wait 60 seconds for tasks to respond to being cancelled - if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) { - Debug.logWarning("Unable to shutdown the thread pool for JobPoller " + this.name, module); - } - } - } catch (InterruptedException ie) { - // re cancel if current thread was also interrupted - this.executor.shutdownNow(); - // preserve interrupt status - Thread.currentThread().interrupt(); - } - Debug.logInfo("Shutdown completed of thread pool for JobPoller " + this.name, module); - } - /** * Returns the JobManager */ @@ -155,21 +88,18 @@ public class JobPoller implements Runnab poolState.put("pollerThreadName", "OFBiz-JobPoller-" + this.name); poolState.put("invokerThreadNameFormat", "OFBiz-JobInvoker-" + this.name + "-<SEQ>"); poolState.put("keepAliveTimeInSeconds", this.executor.getKeepAliveTime(TimeUnit.SECONDS)); - poolState.put("numberOfCoreInvokerThreads", this.executor.getCorePoolSize()); poolState.put("currentNumberOfInvokerThreads", this.executor.getPoolSize()); poolState.put("numberOfActiveInvokerThreads", this.executor.getActiveCount()); poolState.put("maxNumberOfInvokerThreads", this.executor.getMaximumPoolSize()); poolState.put("greatestNumberOfInvokerThreads", this.executor.getLargestPoolSize()); - poolState.put("numberOfCompletedTasks", this.executor.getCompletedTaskCount()); - BlockingQueue<Runnable> queue = this.executor.getQueue(); List taskList = new ArrayList(); Map taskInfo = null; - for (Runnable task: queue) { + for (Runnable task : queue) { if (task instanceof JobInvoker) { - JobInvoker jobInvoker = (JobInvoker)task; + JobInvoker jobInvoker = (JobInvoker) task; taskInfo = new HashMap(); taskInfo.put("id", jobInvoker.getJobId()); taskInfo.put("name", jobInvoker.getJobName()); @@ -183,9 +113,18 @@ public class JobPoller implements Runnab return poolState; } + private long getTTL() { + long ttl = THREAD_TTL; + try { + ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl")); + } catch (NumberFormatException nfe) { + Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module); + } + return ttl; + } + private int maxThreads() { int max = MAX_THREADS; - try { max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads")); } catch (NumberFormatException nfe) { @@ -196,7 +135,6 @@ public class JobPoller implements Runnab private int minThreads() { int min = MIN_THREADS; - try { min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads")); } catch (NumberFormatException nfe) { @@ -205,9 +143,21 @@ public class JobPoller implements Runnab return min; } + private boolean pollEnabled() { + String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled"); + if (enabled.equalsIgnoreCase("false")) + return false; + + // also make sure we have a delegator to use for polling + if (jm.getDelegator() == null) { + Debug.logWarning("No delegator referenced; not starting job poller.", module); + return false; + } + return true; + } + private int pollWaitTime() { int poll = POLL_WAIT; - try { poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis")); } catch (NumberFormatException nfe) { @@ -216,30 +166,62 @@ public class JobPoller implements Runnab return poll; } - private long getTTL() { - long ttl = THREAD_TTL; + /** + * Adds a job to the RUN queue + */ + public void queueNow(Job job) { + this.executor.execute(new JobInvoker(job)); + } + public synchronized void run() { try { - ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl")); - } catch (NumberFormatException nfe) { - Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module); + // wait 30 seconds before the first poll + java.lang.Thread.sleep(30000); + } catch (InterruptedException e) { + } + while (!executor.isShutdown()) { + try { + // grab a list of jobs to run. + List<Job> pollList = jm.poll(); + // Debug.logInfo("Received poll list from JobManager [" + pollList.size() + "]", module); + for (Job job : pollList) { + if (job.isValid()) { + queueNow(job); + // Debug.logInfo("Job [" + job.getJobId() + "] is queued", module); + } + } + // NOTE: using sleep instead of wait for stricter locking + java.lang.Thread.sleep(pollWaitTime()); + } catch (InterruptedException e) { + Debug.logError(e, module); + stop(); + } } - return ttl; } - private boolean pollEnabled() { - String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled"); - - if (enabled.equalsIgnoreCase("false")) - return false; - - // also make sure we have a delegator to use for polling - if (jm.getDelegator() == null) { - Debug.logWarning("No delegator referenced; not starting job poller.", module); - return false; + /** + * Stops the JobPoller + */ + void stop() { + Debug.logInfo("Shutting down thread pool for JobPoller " + this.name, module); + this.executor.shutdown(); + try { + // Wait 60 seconds for existing tasks to terminate + if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) { + // abrupt shutdown (cancel currently executing tasks) + Debug.logInfo("Attempting abrupt shut down of thread pool for JobPoller " + this.name, module); + this.executor.shutdownNow(); + // Wait 60 seconds for tasks to respond to being cancelled + if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) { + Debug.logWarning("Unable to shutdown the thread pool for JobPoller " + this.name, module); + } + } + } catch (InterruptedException ie) { + // re cancel if current thread was also interrupted + this.executor.shutdownNow(); + // preserve interrupt status + Thread.currentThread().interrupt(); } - - return true; + Debug.logInfo("Shutdown completed of thread pool for JobPoller " + this.name, module); } } - |
| Free forum by Nabble | Edit this page |
