svn commit: r1369298 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service: ServiceContainer.java job/JobManager.java job/JobPoller.java

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r1369298 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service: ServiceContainer.java job/JobManager.java job/JobPoller.java

adrianc
Author: adrianc
Date: Sat Aug  4 12:00:07 2012
New Revision: 1369298

URL: http://svn.apache.org/viewvc?rev=1369298&view=rev
Log:
A bunch of fixes for JobManager and JobPoller:

1. Fixed some concurrency issues during object construction.
2. Cleaned up some feature-envy code.
3. Added code for orderly startup and shutdown.

Modified:
    ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java?rev=1369298&r1=1369297&r2=1369298&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java Sat Aug  4 12:00:07 2012
@@ -18,16 +18,17 @@
  *******************************************************************************/
 package org.ofbiz.service;
 
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.ofbiz.base.container.Container;
 import org.ofbiz.base.container.ContainerConfig;
 import org.ofbiz.base.container.ContainerException;
 import org.ofbiz.base.util.Debug;
 import org.ofbiz.base.util.UtilValidate;
 import org.ofbiz.entity.Delegator;
-
-import java.util.Collections;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import org.ofbiz.service.job.JobManager;
 
 /**
  * A container for the service engine.
@@ -64,6 +65,7 @@ public class ServiceContainer implements
 
     @Override
     public void stop() throws ContainerException {
+        JobManager.shutDown();
         Set<String> dispatcherNames = getAllDispatcherNames();
         for (String dispatcherName: dispatcherNames) {
             deregister(dispatcherName);

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1369298&r1=1369297&r2=1369298&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Aug  4 12:00:07 2012
@@ -21,14 +21,15 @@ package org.ofbiz.service.job;
 import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
 
 import javolution.util.FastList;
 
+import org.ofbiz.base.util.Assert;
 import org.ofbiz.base.util.Debug;
-import org.ofbiz.base.util.GeneralRuntimeException;
 import org.ofbiz.base.util.UtilDateTime;
 import org.ofbiz.base.util.UtilMisc;
 import org.ofbiz.base.util.UtilProperties;
@@ -54,18 +55,31 @@ import org.ofbiz.service.config.ServiceC
 /**
  * JobManager
  */
-public class JobManager {
+public final class JobManager {
 
     public static final String module = JobManager.class.getName();
     public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
-    public static final Map<String, Object> updateFields = UtilMisc.<String, Object> toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED");
-    private static final Map<String, JobManager> registeredManagers = new HashMap<String, JobManager>();
+    public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED");
+    private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>();
+    private static boolean isShutDown = false;
 
-    public synchronized static JobManager getInstance(Delegator delegator, boolean enabled) {
+    private static void assertIsRunning() {
+        if (isShutDown) {
+            throw new IllegalStateException("OFBiz shutting down");
+        }
+    }
+
+    public static JobManager getInstance(Delegator delegator, boolean enablePoller) {
+        assertIsRunning();
+        Assert.notNull("delegator", delegator);
         JobManager jm = JobManager.registeredManagers.get(delegator.getDelegatorName());
         if (jm == null) {
-            jm = new JobManager(delegator, enabled);
-            JobManager.registeredManagers.put(delegator.getDelegatorName(), jm);
+            jm = new JobManager(delegator);
+            JobManager.registeredManagers.putIfAbsent(delegator.getDelegatorName(), jm);
+            jm = JobManager.registeredManagers.get(delegator.getDelegatorName());
+            if (enablePoller) {
+                jm.enablePoller();
+            }
         }
         return jm;
     }
@@ -79,7 +93,6 @@ public class JobManager {
                     return null;
                 }
                 GenericValue ri = job.getRelatedOne("RecurrenceInfo", false);
-
                 if (ri != null) {
                     return new RecurrenceInfo(ri);
                 } else {
@@ -89,30 +102,35 @@ public class JobManager {
                 return null;
             }
         } catch (GenericEntityException e) {
-            e.printStackTrace();
             Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module);
         } catch (RecurrenceInfoException re) {
-            re.printStackTrace();
             Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module);
         }
         return null;
     }
 
-    protected Delegator delegator;
-    protected JobPoller jp;
-
-    private JobManager(Delegator delegator, boolean enabled) {
-        if (delegator == null) {
-            throw new GeneralRuntimeException("ERROR: null delegator passed, cannot create JobManager");
+    public static void shutDown() {
+        isShutDown = true;
+        for (JobManager jm : registeredManagers.values()) {
+            jm.shutdown();
         }
+    }
+
+    private final Delegator delegator;
+    private final JobPoller jp;
+    private boolean pollerEnabled = false;
+
+    private JobManager(Delegator delegator) {
         this.delegator = delegator;
-        jp = new JobPoller(this, enabled);
+        jp = new JobPoller(this);
     }
 
-    @Override
-    public void finalize() throws Throwable {
-        this.shutdown();
-        super.finalize();
+    private synchronized void enablePoller() {
+        if (!pollerEnabled) {
+            pollerEnabled = true;
+            reloadCrashedJobs();
+            jp.enable();
+        }
     }
 
     /** Returns the Delegator. */
@@ -136,6 +154,7 @@ public class JobManager {
     }
 
     public synchronized List<Job> poll() {
+        assertIsRunning();
         List<Job> poll = FastList.newInstance();
         // sort the results by time
         List<String> order = UtilMisc.toList("runTime");
@@ -215,8 +234,7 @@ public class JobManager {
         return poll;
     }
 
-    public synchronized void reloadCrashedJobs() {
-        String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
+    private void reloadCrashedJobs() {
         List<GenericValue> crashed = null;
         List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId));
         exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
@@ -231,8 +249,7 @@ public class JobManager {
                 int rescheduled = 0;
                 for (GenericValue job : crashed) {
                     Timestamp now = UtilDateTime.nowTimestamp();
-                    Debug.log("Scheduling Job : " + job, module);
-
+                    Debug.logInfo("Scheduling Job : " + job, module);
                     String pJobId = job.getString("parentJobId");
                     if (pJobId == null) {
                         pJobId = job.getString("jobId");
@@ -245,12 +262,10 @@ public class JobManager {
                     newJob.set("startDateTime", null);
                     newJob.set("runByInstanceId", null);
                     delegator.createSetNextSeqId(newJob);
-
                     // set the cancel time on the old job to the same as the re-schedule time
                     job.set("statusId", "SERVICE_CRASHED");
                     job.set("cancelDateTime", now);
                     delegator.store(job);
-
                     rescheduled++;
                 }
                 if (Debug.infoOn())
@@ -264,8 +279,12 @@ public class JobManager {
         }
     }
 
-    /** Queues a Job to run now. */
+    /** Queues a Job to run now.
+     * @throws IllegalStateException if the Job Manager is shut down.
+     * @throws RejectedExecutionException if the poller is stopped.
+     */
     public void runJob(Job job) throws JobManagerException {
+        assertIsRunning();
         if (job.isValid()) {
             jp.queueNow(job);
         }
@@ -400,10 +419,6 @@ public class JobManager {
      */
     public void schedule(String jobName, String poolName, String serviceName, Map<String, ? extends Object> context, long startTime,
             int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException {
-        if (delegator == null) {
-            Debug.logWarning("No delegator referenced; cannot schedule job.", module);
-            return;
-        }
         // persist the context
         String dataId = null;
         try {
@@ -445,13 +460,11 @@ public class JobManager {
      *            The time in milliseconds the service should expire
      *@param maxRetry
      *            The max number of retries on failure (-1 for no max)
+     * @throws IllegalStateException if the Job Manager is shut down.
      */
     public void schedule(String jobName, String poolName, String serviceName, String dataId, long startTime, int frequency, int interval,
             int count, long endTime, int maxRetry) throws JobManagerException {
-        if (delegator == null) {
-            Debug.logWarning("No delegator referenced; cannot schedule job.", module);
-            return;
-        }
+        assertIsRunning();
         // create the recurrence
         String infoId = null;
         if (frequency > -1 && count != 0) {
@@ -491,11 +504,9 @@ public class JobManager {
 
     /** Close out the scheduler thread. */
     public void shutdown() {
-        if (jp != null) {
-            Debug.logInfo("Stopping the JobManager...", module);
-            jp.stop();
-            jp = null;
-        }
+        Debug.logInfo("Stopping the JobManager...", module);
+        registeredManagers.remove(delegator.getDelegatorName(), this);
+        jp.stop();
         Debug.logInfo("JobManager stopped.", module);
     }
 

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1369298&r1=1369297&r2=1369298&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Aug  4 12:00:07 2012
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -35,7 +36,7 @@ import org.apache.commons.lang.math.Numb
 /**
  * JobPoller - Polls for persisted jobs to run.
  */
-public class JobPoller implements Runnable {
+public final class JobPoller implements Runnable {
 
     public static final String module = JobPoller.class.getName();
     public static final int MIN_THREADS = 1;
@@ -43,12 +44,10 @@ public class JobPoller implements Runnab
     public static final int POLL_WAIT = 20000;
     public static final long THREAD_TTL = 18000000;
 
-    private JobManager jm = null;
-    private ThreadPoolExecutor executor = null;
-    private String name = null;
-
-    protected JobPoller() {
-    }
+    private final JobManager jm;
+    private final ThreadPoolExecutor executor;
+    private final String name;
+    private boolean enabled = false;
 
     /**
      * Creates a new JobScheduler
@@ -56,14 +55,16 @@ public class JobPoller implements Runnab
      * @param jm
      *            JobManager associated with this scheduler
      */
-    public JobPoller(JobManager jm, boolean enabled) {
-        this.name = (jm.getDelegator() != null ? jm.getDelegator().getDelegatorName() : "NA");
+    public JobPoller(JobManager jm) {
+        this.name = jm.getDelegator().getDelegatorName();
         this.jm = jm;
         this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
                 new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy());
-        if (enabled) {
-            // re-load crashed jobs
-            this.jm.reloadCrashedJobs();
+    }
+
+    public synchronized void enable() {
+        if (!enabled) {
+            enabled = true;
             // start the thread only if polling is enabled
             if (pollEnabled()) {
                 // create the poller thread
@@ -83,7 +84,7 @@ public class JobPoller implements Runnab
     }
 
     public Map<String, Object> getPoolState() {
-        Map poolState = new HashMap();
+        Map<String, Object> poolState = new HashMap<String, Object>();
         poolState.put("pollerName", this.name);
         poolState.put("pollerThreadName", "OFBiz-JobPoller-" + this.name);
         poolState.put("invokerThreadNameFormat", "OFBiz-JobInvoker-" + this.name + "-<SEQ>");
@@ -95,19 +96,17 @@ public class JobPoller implements Runnab
         poolState.put("greatestNumberOfInvokerThreads", this.executor.getLargestPoolSize());
         poolState.put("numberOfCompletedTasks", this.executor.getCompletedTaskCount());
         BlockingQueue<Runnable> queue = this.executor.getQueue();
-        List taskList = new ArrayList();
-        Map taskInfo = null;
+        List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>();
+        Map<String, Object> taskInfo = null;
         for (Runnable task : queue) {
-            if (task instanceof JobInvoker) {
-                JobInvoker jobInvoker = (JobInvoker) task;
-                taskInfo = new HashMap();
-                taskInfo.put("id", jobInvoker.getJobId());
-                taskInfo.put("name", jobInvoker.getJobName());
-                taskInfo.put("serviceName", jobInvoker.getServiceName());
-                taskInfo.put("time", jobInvoker.getTime());
-                taskInfo.put("runtime", jobInvoker.getCurrentRuntime());
-                taskList.add(taskInfo);
-            }
+            JobInvoker jobInvoker = (JobInvoker) task;
+            taskInfo = new HashMap<String, Object>();
+            taskInfo.put("id", jobInvoker.getJobId());
+            taskInfo.put("name", jobInvoker.getJobName());
+            taskInfo.put("serviceName", jobInvoker.getServiceName());
+            taskInfo.put("time", jobInvoker.getTime());
+            taskInfo.put("runtime", jobInvoker.getCurrentRuntime());
+            taskList.add(taskInfo);
         }
         poolState.put("taskList", taskList);
         return poolState;
@@ -147,12 +146,6 @@ public class JobPoller implements Runnab
         String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled");
         if (enabled.equalsIgnoreCase("false"))
             return false;
-
-        // also make sure we have a delegator to use for polling
-        if (jm.getDelegator() == null) {
-            Debug.logWarning("No delegator referenced; not starting job poller.", module);
-            return false;
-        }
         return true;
     }
 
@@ -167,7 +160,8 @@ public class JobPoller implements Runnab
     }
 
     /**
-     * Adds a job to the RUN queue
+     * Adds a job to the RUN queue.
+     * @throws RejectedExecutionException if the poller is stopped.
      */
     public void queueNow(Job job) {
         this.executor.execute(new JobInvoker(job));
@@ -197,6 +191,7 @@ public class JobPoller implements Runnab
                 stop();
             }
         }
+        Debug.logInfo("JobPoller " + this.name + " thread terminated.", module);
     }
 
     /**