svn commit: r904987 - in /ofbiz/trunk/framework/base/src/org/ofbiz/base: concurrent/ concurrent/test/ util/

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r904987 - in /ofbiz/trunk/framework/base/src/org/ofbiz/base: concurrent/ concurrent/test/ util/

doogie-3
Author: doogie
Date: Sun Jan 31 06:34:35 2010
New Revision: 904987

URL: http://svn.apache.org/viewvc?rev=904987&view=rev
Log:
First bit of real webslinger code.  TTLObject is designed to save the
result of a single method call, so that it doesn't get called over and
over again.  It's non-blocking in all cases, supports synchronous and
asynchronous update modes, and will even save thrown exceptions.

Nothing in ofbiz uses this yet; that will require rather extensive
testing and profiling, which is difficult until more of ofbiz actually
has full test coverage; there's no reason to fix any slowdowns now until
you know all code is actually correct.

Test coverage on TTLObject is almost completely full; there are some
missed branches on switch statements that use enum(there are google
discussions about this, it's just not possible to do).  Some of the
configuration methods aren't covered either.  However, all switch
branches *are* covered.

Added:
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/ExecutionPool.java
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/TTLObject.java
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/AsyncTTLObjectTest.java
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/SyncTTLObjectTest.java
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/TTLObjectTest.java
    ofbiz/trunk/framework/base/src/org/ofbiz/base/util/ObjectWrapper.java

Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/ExecutionPool.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/ExecutionPool.java?rev=904987&view=auto
==============================================================================
--- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/ExecutionPool.java (added)
+++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/ExecutionPool.java Sun Jan 31 06:34:35 2010
@@ -0,0 +1,134 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.ofbiz.base.concurrent;
+
+import java.lang.management.ManagementFactory;
+import java.util.Iterator;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+public final class ExecutionPool {
+    protected static class ExecutionPoolThreadFactory implements ThreadFactory {
+        private final String namePrefix;
+        private int count = 0;
+
+        protected ExecutionPoolThreadFactory(String namePrefix) {
+            this.namePrefix = namePrefix;
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r);
+            t.setDaemon(true);
+            t.setPriority(Thread.NORM_PRIORITY);
+            t.setName(namePrefix + "-" + count++);
+            return t;
+        }
+    }
+
+    private static class ExecutionPoolFactory {
+        protected static ScheduledThreadPoolExecutor getExecutor(String namePrefix, int threadCount) {
+            ExecutionPoolThreadFactory threadFactory = new ExecutionPoolThreadFactory(namePrefix);
+            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threadCount, threadFactory);
+            executor.prestartAllCoreThreads();
+            return executor;
+        }
+    }
+
+    public static ThreadFactory createThreadFactory(String namePrefix) {
+        return new ExecutionPoolThreadFactory(namePrefix);
+    }
+
+    public static ScheduledExecutorService getExecutor(String namePrefix, int threadCount) {
+        return ExecutionPoolFactory.getExecutor(namePrefix, threadCount);
+    }
+
+    public static ScheduledExecutorService getNewExactExecutor(String namePrefix) {
+        return getExecutor(namePrefix, ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors());
+    }
+
+    public static ScheduledExecutorService getNewOptimalExecutor(String namePrefix) {
+        return getExecutor(namePrefix, ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors() * 2);
+    }
+
+    public static void addPulse(Pulse pulse) {
+        delayQueue.put(pulse);
+    }
+
+    public static void removePulse(Pulse pulse) {
+        delayQueue.remove(pulse);
+    }
+
+    public static void pulseAll() {
+        Iterator<Pulse> it = delayQueue.iterator();
+        while (it.hasNext()) {
+            Pulse pulse = it.next();
+            it.remove();
+            pulse.run();
+        }
+    }
+
+    static {
+        ExecutionPoolPulseWorker worker = new ExecutionPoolPulseWorker();
+        int processorCount = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
+        for (int i = 0; i < processorCount; i++) {
+            Thread t = new Thread(worker);
+            t.setDaemon(true);
+            t.setName("ExecutionPoolPulseWorker(" + i + ")");
+            t.start();
+        }
+    }
+
+    private static final DelayQueue<Pulse> delayQueue = new DelayQueue<Pulse>();
+
+    public static class ExecutionPoolPulseWorker implements Runnable {
+        public void run() {
+            try {
+                while (true) {
+                    delayQueue.take().run();
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public static abstract class Pulse implements Delayed, Runnable {
+        protected final long expireTime;
+
+        protected Pulse(long delayInMillis) {
+            expireTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delayInMillis, TimeUnit.MILLISECONDS);
+        }
+
+        public final long getDelay(TimeUnit unit) {
+            return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
+        }
+
+        public final int compareTo(Delayed other) {
+            long r = (expireTime - ((Pulse) other).expireTime);
+            if (r < 0) return -1;
+            if (r > 0) return 1;
+            return 0;
+        }
+    }
+
+}

Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/TTLObject.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/TTLObject.java?rev=904987&view=auto
==============================================================================
--- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/TTLObject.java (added)
+++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/TTLObject.java Sun Jan 31 06:34:35 2010
@@ -0,0 +1,305 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.ofbiz.base.concurrent;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.ofbiz.base.util.ObjectWrapper;
+import org.ofbiz.base.util.UtilIO;
+
+public abstract class TTLObject<T> implements ObjectWrapper<T> {
+    private static final ScheduledExecutorService updateExecutor = ExecutionPool.getNewOptimalExecutor("TTLObject(async-update)");
+
+    private static final <T> T getConfigForClass(ConcurrentHashMap<String, T> config, Class c) {
+        Class ptr = c;
+        T value = null;
+        while (value == null && ptr != null) {
+            value = config.get(ptr.getName());
+            ptr = ptr.getSuperclass();
+        }
+        return value;
+    }
+
+    private static final ConcurrentHashMap<String, Long> ttls = new ConcurrentHashMap<String, Long>();
+
+    public static void setDefaultTTLForClass(Class c, long ttl) {
+        ttls.putIfAbsent(c.getName(), ttl);
+    }
+
+    public static void setTTLForClass(Class c, long ttl) {
+        ttls.put(c.getName(), ttl);
+    }
+
+    public static long getTTLForClass(Class c) throws ConfigurationException {
+        Long ttl = getConfigForClass(ttls, c);
+        if (ttl != null) return ttl.longValue();
+        throw new ConfigurationException("No TTL defined for " + c.getName());
+    }
+
+    private static final ConcurrentHashMap<String, Boolean> inForeground = new ConcurrentHashMap<String, Boolean>();
+
+    public static void setDefaultForegroundForClass(Class c, boolean foreground) {
+        inForeground.putIfAbsent(c.getName(), foreground);
+    }
+
+    public static void setForegroundForClass(Class c, boolean foreground) {
+        inForeground.put(c.getName(), foreground);
+    }
+
+    public static boolean getForegroundForClass(Class c) {
+        Boolean foreground = getConfigForClass(inForeground, c);
+        if (foreground != null) return foreground.booleanValue();
+        return true;
+    }
+
+    public static void pulseAll() {
+        ExecutionPool.pulseAll();
+    }
+
+    public enum State { INVALID, REGEN, REGENERATING, GENERATE, GENERATING, GENERATING_INITIAL, VALID, ERROR, ERROR_INITIAL }
+    private volatile ValueAndState<T> object = new StandardValueAndState<T>(this, null, null, State.INVALID, 0, null, null);
+    private static final AtomicReferenceFieldUpdater<TTLObject, ValueAndState> objectAccessor = AtomicReferenceFieldUpdater.newUpdater(TTLObject.class, ValueAndState.class, "object");
+    private static final AtomicIntegerFieldUpdater<TTLObject> serialAccessor = AtomicIntegerFieldUpdater.newUpdater(TTLObject.class, "serial");
+    protected volatile int serial;
+
+    protected static abstract class ValueAndState<T> {
+        protected final TTLObject<T> ttlObject;
+        protected final FutureTask<T> future;
+        protected final State state;
+        protected final int serial;
+        protected final Throwable t;
+        protected final Pulse pulse;
+
+        protected ValueAndState(TTLObject<T> ttlObject, FutureTask<T> future, State state, int serial, Throwable t, Pulse pulse) {
+            this.ttlObject = ttlObject;
+            this.future = future;
+            this.state = state;
+            this.serial = serial;
+            this.t = t;
+            this.pulse = pulse;
+        }
+
+        protected abstract T getValue();
+
+        protected ValueAndState<T> refresh(State nextState) {
+            return ttlObject.newValueAndState(getValue(), future, nextState, serial, null, null);
+        }
+
+        protected ValueAndState<T> valid(T value) throws ObjectException {
+            return ttlObject.newValueAndState(value, null, State.VALID, serialAccessor.incrementAndGet(ttlObject), null, new Pulse(ttlObject));
+        }
+
+        protected ValueAndState<T> submit(final T oldValue, State state) {
+            return ttlObject.newValueAndState(getValue(), createTask(oldValue), state, serial, null, null);
+        }
+
+        protected FutureTask<T> createTask(final T oldValue) {
+            return new FutureTask<T>(new Callable<T>() {
+                public T call() throws Exception {
+                    return ttlObject.load(oldValue, serial);
+                }
+            });
+        }
+
+        protected ValueAndState<T> error(Throwable t) throws ObjectException {
+            return ttlObject.newValueAndState(null, null, state != State.GENERATING_INITIAL ? State.ERROR : State.ERROR_INITIAL, serialAccessor.incrementAndGet(ttlObject), t, new Pulse(ttlObject));
+        }
+    }
+
+    protected ValueAndState<T> newValueAndState(T value, FutureTask<T> future, State state, int serial, Throwable t, Pulse pulse) {
+        return new StandardValueAndState<T>(this, value, future, state, serial, t, pulse);
+    }
+
+    private class StandardValueAndState<T> extends ValueAndState<T> {
+        protected final T value;
+
+        protected StandardValueAndState(TTLObject<T> ttlObject, T value, FutureTask<T> future, State state, int serial, Throwable t, Pulse pulse) {
+            super(ttlObject, future, state, serial, t, pulse);
+            this.value = value;
+        }
+
+        protected T getValue() {
+            return value;
+        }
+    }
+
+    protected final static class Pulse extends ExecutionPool.Pulse {
+        protected final TTLObject<?> ttlObject;
+
+        protected Pulse(TTLObject<?> ttlObject) throws ObjectException {
+            super(ttlObject.getTTL());
+            this.ttlObject = ttlObject;
+        }
+
+        public void run() {
+            ttlObject.refresh();
+        }
+    }
+
+    public State getState() {
+        return getContainer().state;
+    }
+
+    @SuppressWarnings("unchecked")
+    private final ValueAndState<T> getContainer() {
+        return objectAccessor.get(this);
+    }
+
+    public void refresh() {
+        ValueAndState<T> container;
+        ValueAndState<T> nextContainer = null;
+        do {
+            container = getContainer();
+            switch (container.state) {
+                case INVALID:
+                    nextContainer = container.refresh(State.GENERATE);
+                    break;
+                case REGENERATING:
+                    nextContainer = container.refresh(State.REGEN);
+                    break;
+                case GENERATING:
+                    nextContainer = container.refresh(State.GENERATE);
+                    break;
+                case ERROR_INITIAL:
+                    nextContainer = container.refresh(State.INVALID);
+                    break;
+                case ERROR:
+                case VALID:
+                    nextContainer = container.refresh(getForeground() ? State.GENERATE : State.REGEN);
+                    break;
+                case REGEN:
+                case GENERATE:
+                    return;
+            }
+        } while (!objectAccessor.compareAndSet(this, container, nextContainer));
+        cancelFuture(container);
+    }
+
+    public final int getSerial() {
+        return getContainer().serial;
+    }
+
+    public final boolean checkSerial(int serial) {
+        return getContainer().serial != serial;
+    }
+
+    protected final void setObject(T newObject) throws ObjectException {
+        ValueAndState<T> container, nextContainer;
+        State nextState;
+        do {
+            container = getContainer();
+            nextContainer = container.valid(newObject);
+        } while (!objectAccessor.compareAndSet(this, container, nextContainer));
+        cancelFuture(container);
+        ExecutionPool.addPulse(nextContainer.pulse);
+    }
+
+    private void cancelFuture(ValueAndState<T> container) {
+        ExecutionPool.removePulse(container.pulse);
+        switch (container.state) {
+            case REGENERATING:
+            case GENERATING:
+                container.future.cancel(false);
+                break;
+        }
+    }
+
+    public final T getObject() throws ObjectException {
+        try {
+            ValueAndState<T> container;
+            ValueAndState<T> nextContainer = null;
+            do {
+                do {
+                    container = getContainer();
+                    switch (container.state) {
+                        case ERROR:
+                        case ERROR_INITIAL:
+                            throw container.t;
+                        case VALID:
+                            return container.getValue();
+                        case INVALID:
+                            nextContainer = container.submit(getInitial(), State.GENERATING_INITIAL);
+                            break;
+                        case REGENERATING:
+                            if (!container.future.isDone()) {
+                                return container.getValue();
+                            }
+                        case GENERATING:
+                        case GENERATING_INITIAL:
+                            try {
+                                try {
+                                    nextContainer = container.valid(container.future.get());
+                                } catch (ExecutionException e) {
+                                    throw e.getCause();
+                                }
+                            } catch (Throwable t) {
+                                nextContainer = container.error(t);
+                            }
+                            break;
+                        case REGEN:
+                            nextContainer = container.submit(container.getValue(), State.REGENERATING);
+                            break;
+                        case GENERATE:
+                            nextContainer = container.submit(container.getValue(), State.GENERATING);
+                            break;
+                    }
+                } while (!objectAccessor.compareAndSet(this, container, nextContainer));
+                switch (nextContainer.state) {
+                    case GENERATING:
+                    case GENERATING_INITIAL:
+                        nextContainer.future.run();
+                        break;
+                    case REGENERATING:
+                        updateExecutor.submit(nextContainer.future);
+                        break;
+                    case ERROR_INITIAL:
+                    case ERROR:
+                    case VALID:
+                        ExecutionPool.removePulse(container.pulse);
+                        ExecutionPool.addPulse(nextContainer.pulse);
+                        break;
+                }
+            } while (true);
+        } catch (Throwable e) {
+            return ObjectException.<T>checkException(e);
+        }
+    }
+
+    protected T getInitial() throws Exception {
+        return null;
+    }
+
+    protected abstract T load(T old, int serial) throws Exception;
+
+    protected boolean getForeground() {
+        return getForegroundForClass(getClass());
+    }
+
+    protected long getTTL() throws ConfigurationException {
+        return getTTLForClass(getClass());
+    }
+}

Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/AsyncTTLObjectTest.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/AsyncTTLObjectTest.java?rev=904987&view=auto
==============================================================================
--- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/AsyncTTLObjectTest.java (added)
+++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/AsyncTTLObjectTest.java Sun Jan 31 06:34:35 2010
@@ -0,0 +1,157 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.ofbiz.base.concurrent.test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.ofbiz.base.concurrent.TTLObject;
+import org.ofbiz.base.test.GenericTestCaseBase;
+
+public class AsyncTTLObjectTest extends TTLObjectTest {
+    public AsyncTTLObjectTest(String name) {
+        super(name, false);
+    }
+
+    public void testGet() throws Exception {
+        loadData = "1";
+        sleepTime = 1000;
+        assertGetObject("Fetch data first time, blocking", loadData, 1, 950000000, 1200000000);
+        loadData = "2";
+        sleepTime = 100;
+        assertGetObject("Not called all the time", "1", 1, 0, 100000000);
+        Thread.sleep(300);
+        assertGetObject("Stale data, starting regen", "1", 1, 0, 100000000);
+        Thread.sleep(300);
+        assertGetObject("Refreshed with old data", loadData, 2, 0, 100000000);
+        loadData = "3";
+        sleepTime = 1000;
+        Thread.sleep(200);
+        assertGetObject("Load called, serve stale data", "2", 2, 0, 100000000);
+        Thread.sleep(200);
+        assertGetObject("Load called, serve stale data", "2", 2, 0, 100000000);
+        Thread.sleep(200);
+        assertGetObject("Load called, serve stale data", "2", 2, 0, 100000000);
+        Thread.sleep(800);
+        assertGetObject("Serve new data", loadData, 3, 0, 100000000);
+        object.set("a");
+        assertGetObject("Serve set data(a)", "a", 3, 0, 100000000);
+        Thread.sleep(500);
+        object.set("b");
+        assertGetObject("Serve set data(b)", "b", 3, 0, 100000000);
+        Thread.sleep(300);
+        loadData = "4";
+        sleepTime = 200;
+        Future<Void> future = schedule(new Callable<Void>() {
+            public Void call() {
+                object.refresh();
+                return null;
+            }
+        }, 50);
+        assertGetObject("Refreshed with old data", "b", 3, 0, 100000000);
+        Thread.sleep(100);
+        assertGetObject("Refreshed with old data", "b", 3, 0, 100000000);
+        Thread.sleep(350);
+        assertGetObject("Refreshed with old data", "4", 5, 0, 100000000);
+    }
+
+    public void testSet() throws Exception {
+        object.set("set");
+        assertEquals("data after set", "set", object.getObject());
+        assertEquals("no dones", 0, doneCount.get());
+        loadData = "1";
+        sleepTime = 100;
+        Thread.sleep(200);
+        assertGetObject("SET: stale, start load", "set", 0, 0, 100000000);
+        Thread.sleep(200);
+        loadData = "2";
+        sleepTime = 500;
+        assertGetObject("SET: valid, process load, schedule pulse 1", "1", 1, 0, 100000000);
+        Thread.sleep(100);
+        assertGetObject("SET: stale 1", "1", 1, 0, 100000000);
+        Thread.sleep(100);
+        assertGetObject("SET: stale 2", "1", 1, 0, 100000000);
+        Thread.sleep(100);
+        assertGetObject("SET: stale 3", "1", 1, 0, 100000000);
+        Thread.sleep(600);
+        assertGetObject("SET: valid, process load, schedule pulse 2", "2", 2, 0, 100000000);
+    }
+
+    public void testSetGetAbort() throws Exception {
+        loadData = "1";
+        sleepTime = 1000;
+        Future<Void> future = setObjectDelayed(300, "override");
+        assertGetObject("Fetch data first time, blocking/setting", "override", 1, 250000000, 400000000);
+        assertFuture("delayed set", future, null, false, null, null);
+    }
+
+    public void testThrowException() throws Exception {
+        loadData = "1";
+        sleepTime = 100;
+        throwException.set(new Thrower() {
+            public void throwException() throws Exception {
+                throw new Exception("exc1");
+            }
+        });
+        TTLObject.ObjectException caught = null;
+        try {
+            assertGetObject("Fetch data first time, throw exception", "override", 1, 0, 200000000);
+        } catch (TTLObject.ObjectException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertEquals("correct exception thrown", "exc1", caught.getCause().getMessage());
+        }
+        caught = null;
+        try {
+            object.getObject();
+        } catch (TTLObject.ObjectException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertEquals("correct exception rethrown", "exc1", caught.getCause().getMessage());
+        }
+        Thread.sleep(300);
+        loadData = "2";
+        sleepTime = 100;
+        assertGetObject("Fetch data after exception, blocked", loadData, 2, 50000000, 200000000);
+        loadData = "3";
+        sleepTime = 500;
+        throwException.set(new Thrower() {
+            public void throwException() throws Exception {
+                throw new Exception("exc2");
+            }
+        });
+        Thread.sleep(200);
+        // the next call should not throw an exception; if it does, it will
+        // leave this method, and be caught by junit, which will then fail
+        // the test.
+        assertGetObject("Fetch data second time, stale, process pulse", "2", 2, 0, 200000000);
+        Thread.sleep(600);
+        caught = null;
+        try {
+            assertGetObject("Fetch data second time, throw exception", "2", 2, 0, 200000000);
+        } catch (TTLObject.ObjectException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertEquals("correct exception thrown", "exc2", caught.getCause().getMessage());
+        }
+    }
+}

Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/SyncTTLObjectTest.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/SyncTTLObjectTest.java?rev=904987&view=auto
==============================================================================
--- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/SyncTTLObjectTest.java (added)
+++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/SyncTTLObjectTest.java Sun Jan 31 06:34:35 2010
@@ -0,0 +1,199 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.ofbiz.base.concurrent.test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.ofbiz.base.concurrent.TTLObject;
+import org.ofbiz.base.test.GenericTestCaseBase;
+
+public class SyncTTLObjectTest extends TTLObjectTest {
+    public SyncTTLObjectTest(String name) {
+        super(name, true);
+    }
+
+    public void testRefresh() throws Exception {
+        assertEquals("state:invalid", TTLObject.State.INVALID, object.getState());
+        assertEquals("no dones", 0, doneCount.get());
+        object.refresh();
+        assertEquals("state:generate", TTLObject.State.GENERATE, object.getState());
+        assertEquals("no dones", 0, doneCount.get());
+        object.refresh();
+        assertEquals("state:generate", TTLObject.State.GENERATE, object.getState());
+        assertEquals("no dones", 0, doneCount.get());
+        object.getObject();
+        assertEquals("state:valid", TTLObject.State.VALID, object.getState());
+        assertEquals("one done", 1, doneCount.get());
+        object.getObject();
+        assertEquals("state:valid", TTLObject.State.VALID, object.getState());
+        assertEquals("one done", 1, doneCount.get());
+        object.getObject();
+        assertEquals("state:valid", TTLObject.State.VALID, object.getState());
+        assertEquals("one done", 1, doneCount.get());
+        object.getObject();
+        assertEquals("state:valid", TTLObject.State.VALID, object.getState());
+        assertEquals("one done", 1, doneCount.get());
+        object.refresh();
+        assertEquals("state:generate", TTLObject.State.GENERATE, object.getState());
+        assertEquals("one done", 1, doneCount.get());
+        object.getObject();
+        assertEquals("state:valid", TTLObject.State.VALID, object.getState());
+        assertEquals("two dones", 2, doneCount.get());
+        object.refresh();
+        assertEquals("state:generate", TTLObject.State.GENERATE, object.getState());
+        assertEquals("two dones", 2, doneCount.get());
+        throwException.set(new Thrower() {
+            public void throwException() throws Exception {
+                throw new Exception("exc1");
+            }
+        });
+        TTLObject.ObjectException caught = null;
+        try {
+            object.getObject();
+        } catch (TTLObject.ObjectException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertEquals("correct exception thrown", "exc1", caught.getCause().getMessage());
+        }
+        assertEquals("two dones", 3, doneCount.get());
+        caught = null;
+        try {
+            object.getObject();
+        } catch (TTLObject.ObjectException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertEquals("correct exception rethrown", "exc1", caught.getCause().getMessage());
+        }
+        object.refresh();
+        assertEquals("two dones", 3, doneCount.get());
+        object.getObject();
+        assertEquals("two dones", 4, doneCount.get());
+    }
+
+    public void testGetTTL() throws Exception {
+        Exception caught = null;
+        try {
+            new TTLObject<Object>() {
+                protected Object load(Object old, int serial) {
+                    return old;
+                }
+            }.getObject();
+        } catch (TTLObject.ConfigurationException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertTrue("is a ttl configuration exception", caught.getMessage().startsWith("No TTL defined for "));
+        }
+        new TTLObject<Object>() {
+            protected long getTTL() {
+                return 1000;
+            }
+
+            protected Object load(Object old, int serial) {
+                return old;
+            }
+        }.getObject();
+    }
+
+    public void testGet() throws Exception {
+        loadData = "1";
+        sleepTime = 1000;
+        assertGetObject("Fetch data first time, blocking", loadData, 1, 950000000, 1200000000);
+        loadData = "2";
+        sleepTime = 100;
+        assertGetObject("Not called all the time", "1", 1, 0, 100000000);
+        Thread.sleep(200);
+        assertGetObject("Auto-refresh", "2", 2, 0, 100000000);
+        loadData = "3";
+        sleepTime = 200;
+        object.refresh();
+        assertGetObject("manual-refresh", "3", 3, 0, 100000000);
+        assertGetObject("Not called all the time after manual refresh", "3", 3, 0, 100000000);
+        Thread.sleep(200);
+        loadData = "4";
+        sleepTime = 200;
+        Future<Void> future = schedule(new Callable<Void>() {
+            public Void call() {
+                object.refresh();
+                return null;
+            }
+        }, 50);
+        assertGetObject("Refreshed with old data", "4", 5, 0, 100000000);
+    }
+
+    public void testSetGetAbort() throws Exception {
+        loadData = "1";
+        sleepTime = 1000;
+        Future<Void> future = setObjectDelayed(300, "override");
+        assertGetObject("Fetch data first time, blocking/setting", "override", 1, 250000000, 400000000);
+        assertFuture("delayed set", future, null, false, null, null);
+    }
+
+    public void testThrowException() throws Exception {
+        loadData = "1";
+        sleepTime = 100;
+        throwException.set(new Thrower() {
+            public void throwException() throws Exception {
+                throw new Exception("exc1");
+            }
+        });
+        TTLObject.ObjectException caught = null;
+        try {
+            assertGetObject("Fetch data first time, throw exception", "override", 1, 0, 200000000);
+        } catch (TTLObject.ObjectException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertEquals("correct exception thrown", "exc1", caught.getCause().getMessage());
+        }
+        caught = null;
+        try {
+            object.getObject();
+        } catch (TTLObject.ObjectException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertEquals("correct exception rethrown", "exc1", caught.getCause().getMessage());
+        }
+        Thread.sleep(300);
+        loadData = "2";
+        sleepTime = 100;
+        assertGetObject("Fetch data after exception, blocked", loadData, 2, 50000000, 200000000);
+        loadData = "3";
+        sleepTime = 500;
+        throwException.set(new Thrower() {
+            public void throwException() throws Exception {
+                throw new Exception("exc2");
+            }
+        });
+        Thread.sleep(200);
+        caught = null;
+        try {
+            assertGetObject("Fetch data second time, throw exception", "2", 2, 0, 200000000);
+        } catch (TTLObject.ObjectException e) {
+            caught = e;
+        } finally {
+            assertNotNull("exception thrown", caught);
+            assertEquals("correct exception thrown", "exc2", caught.getCause().getMessage());
+        }
+    }
+}

Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/TTLObjectTest.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/TTLObjectTest.java?rev=904987&view=auto
==============================================================================
--- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/TTLObjectTest.java (added)
+++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/TTLObjectTest.java Sun Jan 31 06:34:35 2010
@@ -0,0 +1,133 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.ofbiz.base.concurrent.test;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.ofbiz.base.concurrent.ExecutionPool;
+import org.ofbiz.base.concurrent.TTLObject;
+import org.ofbiz.base.test.GenericTestCaseBase;
+
+public abstract class TTLObjectTest extends GenericTestCaseBase {
+    static {
+        TTLObject.setDefaultTTLForClass(TTLObjectTestTTLObject.class, 100);
+    }
+
+    protected final AtomicInteger doneCount = new AtomicInteger();
+    protected final AtomicReference<Thread> lastLoadThread = new AtomicReference<Thread>();
+    protected final AtomicReference<Thrower> throwException = new AtomicReference<Thrower>();
+
+    protected final TTLObjectTestTTLObject object;
+    protected String loadData;
+    protected long sleepTime;
+    protected ScheduledExecutorService executor;
+
+    protected TTLObjectTest(String name, boolean foreground) {
+        super(name);
+        object = new TTLObjectTestTTLObject(foreground);
+    }
+
+    protected void setUp() throws InterruptedException {
+        executor = ExecutionPool.getNewExactExecutor(getName());
+    }
+
+    protected void tearDown() throws InterruptedException {
+        doneCount.set(0);
+        lastLoadThread.set(null);
+        throwException.set(null);
+        List<Runnable> runnables = executor.shutdownNow();
+        assertEquals("no runnables", 0, runnables.size());
+    }
+
+    protected Future<Void> schedule(Callable<Void> callable,  long millis) {
+        return executor.schedule(callable, millis, TimeUnit.MILLISECONDS);
+    }
+
+    protected Future<Void> setObjectDelayed(long millis, final String value) {
+        return schedule(new Callable<Void>() {
+            public Void call() throws Exception {
+                object.set(value);
+                return null;
+            }
+        }, millis);
+    }
+
+    protected void assertGetObject(String label, String wantedData, int wantedDoneCount, long minTime, long maxTime) throws Exception {
+        long t1 = System.nanoTime();
+        assertEquals(label + ": data", wantedData, object.getObject());
+        int serial = object.getSerial();
+        assertEquals(label + ": doneCount", wantedDoneCount, doneCount.get());
+        long t2 = System.nanoTime();
+        long time = t2 - t1;
+        assertNotSame(label + ": long enough(" + time + " >= " + minTime + ")", time - minTime, Math.abs(time - minTime));
+        assertNotSame(label + ": quick enough(" + time + " <= " + maxTime + ")", maxTime - time, Math.abs(maxTime - time));
+    }
+
+    public interface Thrower {
+        void throwException() throws InterruptedException, Exception;
+    }
+
+    protected final class TTLObjectTestTTLObject extends TTLObject<String> {
+        private final boolean foreground;
+
+        protected TTLObjectTestTTLObject(boolean foreground) {
+            this.foreground = foreground;
+        }
+
+        public long getTTL() throws ConfigurationException {
+            return super.getTTL();
+        }
+
+        protected boolean getForeground() {
+            return foreground ? super.getForeground() : false;
+        }
+
+        protected String load(String old, int serial) throws Exception {
+            lastLoadThread.set(Thread.currentThread());
+            try {
+                long end = System.nanoTime() + sleepTime * 1000000;
+                while (System.nanoTime() <= end) {
+                    Thread.sleep(10);
+                    if (checkSerial(serial)) break;
+                }
+                Thrower thrower;
+                do {
+                    thrower = (Thrower) throwException.get();
+                } while (!throwException.compareAndSet(thrower, null));
+                if (thrower != null) {
+                    thrower.throwException();
+                }
+            } finally {
+                doneCount.incrementAndGet();
+            }
+            return loadData;
+        }
+
+        public void set(String value) throws ObjectException {
+            setObject(value);
+        }
+    }
+}

Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/util/ObjectWrapper.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/util/ObjectWrapper.java?rev=904987&view=auto
==============================================================================
--- ofbiz/trunk/framework/base/src/org/ofbiz/base/util/ObjectWrapper.java (added)
+++ ofbiz/trunk/framework/base/src/org/ofbiz/base/util/ObjectWrapper.java Sun Jan 31 06:34:35 2010
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.ofbiz.base.util;
+
+public interface ObjectWrapper<T> {
+    T getObject() throws ObjectException;
+
+    public class ObjectException extends Exception {
+        protected ObjectException(Throwable cause) {
+            this(cause.getMessage(), cause);
+        }
+
+        protected ObjectException(String msg, Throwable cause) {
+            super(msg, cause);
+        }
+
+        protected ObjectException(String msg) {
+            super(msg);
+        }
+
+        public static final <T> T checkException(Throwable t) throws ObjectException {
+            if (t instanceof ObjectException) throw (ObjectException) t;
+            if (t instanceof RuntimeException) throw (RuntimeException) t;
+            if (t instanceof Error) throw (Error) t;
+            throw new NestedException(t);
+        }
+    }
+
+    public class NestedException extends ObjectException {
+        public NestedException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    public class ConfigurationException extends ObjectException {
+        public ConfigurationException(String msg) {
+            super(msg);
+        }
+    }
+}