001 package dk.deepthought.sidious.util;
002
003 import java.util.LinkedList;
004 import java.util.List;
005
006 import net.jcip.annotations.ThreadSafe;
007
008 import org.apache.commons.logging.Log;
009 import org.apache.commons.logging.LogFactory;
010
011 /**
012 * Abstract representation of a queue which uses a thread to dequeue and process
013 * elements.
014 * <p>
015 * Implements a work queue; which allows for asynchronous processing of enqueued
016 * items.
017 *
018 * @author Deepthought
019 *
020 */
021 @ThreadSafe
022 public abstract class SidiousQueue<T> {
023 /**
024 * Logger for this class
025 */
026 private static final Log logger = LogFactory.getLog(SidiousQueue.class);
027
028 /**
029 * Private thread implementation.
030 */
031 private class InternalThread implements Runnable {
032 public void run() {
033 if (logger.isDebugEnabled()) {
034 logger.debug("run() - start");
035 }
036
037 while (true) {
038 T item = null;
039 synchronized (queue) {
040 while (queue.isEmpty() && !interrupted) {
041 try {
042 if (logger.isDebugEnabled()) {
043 logger.debug("run() - waiting - start");
044 }
045 queue.wait();
046 if (logger.isDebugEnabled()) {
047 logger.debug("run() - waiting - end");
048 }
049 } catch (InterruptedException e) {
050 logger.error("run()", e);
051 return;
052 }
053 }
054 if (interrupted) {
055 if (logger.isDebugEnabled()) {
056 logger.debug("run() - interrupted");
057 }
058
059 return;
060 }
061 item = queue.remove(0);
062 if (logger.isDebugEnabled()) {
063 logger.debug("run() - " + item + " dequeued");
064 }
065 }
066 process(item);
067 }
068 }
069 }
070
071 /**
072 * Boolean to indicate whether the thread has been interrupted.
073 */
074 private boolean interrupted = false;
075
076 /**
077 * Internal queue.
078 */
079 private final List<T> queue = new LinkedList<T>();
080
081 /**
082 * Constructor that starts a new thread.
083 */
084 protected SidiousQueue(String name) {
085 startThread(name);
086 }
087
088 /**
089 * Enqueues an item.
090 *
091 * @param item
092 * the item to be enqueued
093 */
094 public final void enqueue(T item) {
095 if (logger.isDebugEnabled()) {
096 logger.debug("enqueue(item=" + item + ") - start");
097 }
098 synchronized (queue) {
099 queue.add(item);
100 queue.notify();
101 }
102
103 if (logger.isDebugEnabled()) {
104 logger.debug("enqueue(item=" + item + ") - end");
105 }
106 }
107
108 /**
109 * Interrupts the thread.
110 */
111 public final void interrupt() {
112 if (logger.isDebugEnabled()) {
113 logger.debug("interrupt() - start");
114 }
115 synchronized (queue) {
116 interrupted = true;
117 queue.notify();
118 }
119 if (logger.isDebugEnabled()) {
120 logger.debug("interrupt() - end");
121 }
122 }
123
124 /**
125 * The processing of an item.
126 *
127 * @param item
128 * the item to be processed.
129 */
130 protected abstract void process(T item);
131
132 /**
133 * Method to start the thread.
134 *
135 * @param name
136 * the name of the thread
137 */
138 private void startThread(String name) {
139 Thread thread = new Thread(new InternalThread(), name);
140 thread.setDaemon(false);
141 thread.start();
142 if (logger.isDebugEnabled()) {
143 logger.debug("startThread() - Thread started");
144 }
145 }
146
147 }
|