001 package dk.deepthought.sidious.util;
002 
003 import java.util.LinkedList;
004 import java.util.List;
005 
006 import net.jcip.annotations.ThreadSafe;
007 
008 import org.apache.commons.logging.Log;
009 import org.apache.commons.logging.LogFactory;
010 
011 /**
012  * Abstract representation of a queue which uses a thread to dequeue and process
013  * elements.
014  <p>
015  * Implements a work queue; which allows for asynchronous processing of enqueued
016  * items.
017  
018  @author Deepthought
019  
020  */
021 @ThreadSafe
022 public abstract class SidiousQueue<T> {
023     /**
024      * Logger for this class
025      */
026     private static final Log logger = LogFactory.getLog(SidiousQueue.class);
027 
028     /**
029      * Private thread implementation.
030      */
031     private class InternalThread implements Runnable {
032         public void run() {
033             if (logger.isDebugEnabled()) {
034                 logger.debug("run() - start");
035             }
036 
037             while (true) {
038                 T item = null;
039                 synchronized (queue) {
040                     while (queue.isEmpty() && !interrupted) {
041                         try {
042                             if (logger.isDebugEnabled()) {
043                                 logger.debug("run() - waiting - start");
044                             }
045                             queue.wait();
046                             if (logger.isDebugEnabled()) {
047                                 logger.debug("run() - waiting - end");
048                             }
049                         catch (InterruptedException e) {
050                             logger.error("run()", e);
051                             return;
052                         }
053                     }
054                     if (interrupted) {
055                         if (logger.isDebugEnabled()) {
056                             logger.debug("run() - interrupted");
057                         }
058 
059                         return;
060                     }
061                     item = queue.remove(0);
062                     if (logger.isDebugEnabled()) {
063                         logger.debug("run() - " + item + " dequeued");
064                     }
065                 }
066                 process(item);
067             }
068         }
069     }
070 
071     /**
072      * Boolean to indicate whether the thread has been interrupted.
073      */
074     private boolean interrupted = false;
075 
076     /**
077      * Internal queue.
078      */
079     private final List<T> queue = new LinkedList<T>();
080 
081     /**
082      * Constructor that starts a new thread.
083      */
084     protected SidiousQueue(String name) {
085         startThread(name);
086     }
087 
088     /**
089      * Enqueues an item.
090      
091      @param item
092      *            the item to be enqueued
093      */
094     public final void enqueue(T item) {
095         if (logger.isDebugEnabled()) {
096             logger.debug("enqueue(item=" + item + ") - start");
097         }
098         synchronized (queue) {
099             queue.add(item);
100             queue.notify();
101         }
102 
103         if (logger.isDebugEnabled()) {
104             logger.debug("enqueue(item=" + item + ") - end");
105         }
106     }
107 
108     /**
109      * Interrupts the thread.
110      */
111     public final void interrupt() {
112         if (logger.isDebugEnabled()) {
113             logger.debug("interrupt() - start");
114         }
115         synchronized (queue) {
116             interrupted = true;
117             queue.notify();
118         }
119         if (logger.isDebugEnabled()) {
120             logger.debug("interrupt() - end");
121         }
122     }
123 
124     /**
125      * The processing of an item.
126      
127      @param item
128      *            the item to be processed.
129      */
130     protected abstract void process(T item);
131 
132     /**
133      * Method to start the thread.
134      
135      @param name
136      *            the name of the thread
137      */
138     private void startThread(String name) {
139         Thread thread = new Thread(new InternalThread(), name);
140         thread.setDaemon(false);
141         thread.start();
142         if (logger.isDebugEnabled()) {
143             logger.debug("startThread() - Thread started");
144         }
145     }
146 
147 }