distributionQueue.h

Go to the documentation of this file.
00001 
00014 #ifndef _DLR_THREAD_DISTRIBUTIONQUEUE_H_
00015 #define _DLR_THREAD_DISTRIBUTIONQUEUE_H_
00016 
00017 #include <map>
00018 #include <boost/thread.hpp>
00019 #include <dlrThread/clientID.h>
00020 #include <dlrThread/exception.h>
00021 #include <dlrThread/monitor.h>
00022 #include <dlrUtilities/timeUtilities.h>
00023 
00024 
00025 namespace dlr {
00026 
00027   namespace thread {
00028 
00056     template <class Type>
00057     class DistributionQueue
00058       : public Monitor
00059     {
00060     public:
00061 
00073       typedef ClientID<Type> ClientID;
00074 
00075 
00093       DistributionQueue(size_t maximumLength, size_t numberOfClients=2);
00094 
00095 
00103       DistributionQueue(const DistributionQueue<Type>& source);
00104 
00105 
00109       virtual
00110       ~DistributionQueue();
00111 
00112 
00121       DistributionQueue<Type>&
00122       operator=(const DistributionQueue<Type>& source);
00123 
00124 
00144       void
00145       bufferBack(const ClientID& clientID);
00146 
00147 
00167       bool
00168       bufferBack(const ClientID& clientID, double timeout);
00169 
00170 
00174       void
00175       clear();
00176       
00177 
00194       void
00195       copyBuffer(const ClientID& clientID, Type& target);
00196 
00197 
00206       size_t
00207       getMaximumLength() {return m_size;}
00208       
00209 
00253       void
00254       lockBack(const ClientID& clientID);
00255 
00256 
00278       bool
00279       lockBack(const ClientID& clientID, double timeout);
00280       
00281 
00295       void
00296       pushFront(const ClientID& clientID, const Type& element);
00297 
00298 
00325       void
00326       registerClient(ClientID& clientID,
00327                      const std::string& clientName="anonymous");
00328 
00329 
00339       void
00340       unlockBack(const ClientID& clientID);
00341       
00342     private:
00343 
00344       inline void
00345       releaseLock();
00346       
00347       
00348       inline void
00349       waitForLock(Token& token);
00350 
00351 
00352       inline bool
00353       waitForLock(Token& token, double timeout);
00354 
00355       
00356         
00357       size_t* m_headIndexPtr;
00358       size_t m_size;
00359       size_t* m_tailIndexPtr;
00360       
00361       Type** m_bufferArrayPtr;
00362       std::map<size_t, std::string>* m_clientNameMapPtr;
00363       Type* m_dataArray;
00364       size_t* m_maxNumberOfClientsPtr;
00365       size_t* m_numberOfClientsPtr;
00366 
00367       Condition m_headChangedCondition;
00368       bool* m_isLockedPtr;
00369       Condition m_isLockedCondition;
00370 
00371       // We use a pointer to ReferenceCount, rather than a
00372       // ReferenceCount instance, so that we can manage construction,
00373       // destruction, etc. inside locked sections of code.
00374       ReferenceCount* m_referenceCountPtr;
00375     };
00376 
00377   } // namespace thread
00378 
00379 } // namespace dlr
00380 
00381 
00382 /* =============       Implementation follows        ============= */
00383 
00384 #include <algorithm>
00385 #include <iostream>
00386 #include <sstream>
00387 #include <dlrThread/private.h>
00388 
00389 
00390 namespace dlr {
00391 
00392   namespace thread {
00393 
00394     /* ===========    Implementation of DistributionQueue      =========== */
00395 
00396     template <class Type>
00397     DistributionQueue<Type>::
00398     DistributionQueue(size_t maximumLength, size_t numberOfClients)
00399       : Monitor(),
00400         m_headIndexPtr(new size_t),
00401         m_size(maximumLength),
00402         m_tailIndexPtr(new size_t),
00403         m_bufferArrayPtr(new Type*),
00404         m_clientNameMapPtr(new std::map<size_t, std::string>),
00405         m_dataArray(new Type[maximumLength]),
00406         m_maxNumberOfClientsPtr(new size_t),
00407         m_numberOfClientsPtr(new size_t),
00408                 m_headChangedCondition(Monitor::createCondition()),
00409         m_isLockedPtr(new bool),
00410                 m_isLockedCondition(Monitor::createCondition()),
00411         m_referenceCountPtr(new ReferenceCount)
00412     {
00413       if(numberOfClients == 0) {
00414         DLR_THROW(ValueException, "DistributionQueue::DistributionQueue()",
00415                   "Argument numberOfClients must be greater than zero.");
00416       }
00417 
00418       // No elements have been added yet.
00419       *m_headIndexPtr = 0;
00420       *m_tailIndexPtr = 0;
00421 
00422       // But indicate that no clients have registered yet.
00423       *m_bufferArrayPtr = new Type[numberOfClients];
00424       *m_maxNumberOfClientsPtr = numberOfClients;
00425       *m_numberOfClientsPtr = 0;
00426 
00427       // And that nobody's called this->lockBack().
00428       *m_isLockedPtr = false;
00429 
00430       // Finally, allow copying.
00431       this->makeCopyable();
00432     }
00433 
00434 
00435     // The copy constructor does a shallow copy.
00436     template <class Type>
00437     DistributionQueue<Type>::
00438     DistributionQueue(const DistributionQueue<Type>& source)
00439       : Monitor(source),
00440         m_headIndexPtr(0),
00441         m_size(0),
00442         m_tailIndexPtr(0),
00443         m_bufferArrayPtr(0),
00444         m_clientNameMapPtr(0),
00445         m_dataArray(0),
00446         m_maxNumberOfClientsPtr(0),
00447         m_numberOfClientsPtr(0),
00448                 m_headChangedCondition(Monitor::createCondition(false)),
00449         m_isLockedPtr(0),
00450                 m_isLockedCondition(Monitor::createCondition(false)),
00451         m_referenceCountPtr(0)
00452     {
00453       Token token = this->getToken();
00454       // No need to wait for *m_isLockedPtr because we're not going to
00455       // read or change queue contents.
00456       
00457       m_headIndexPtr = source.m_headIndexPtr;
00458       m_size = source.m_size;
00459       m_tailIndexPtr = source.m_tailIndexPtr;
00460       m_bufferArrayPtr = source.m_bufferArrayPtr;
00461       m_clientNameMapPtr = source.m_clientNameMapPtr;
00462       m_dataArray = source.m_dataArray;
00463       m_maxNumberOfClientsPtr = source.m_maxNumberOfClientsPtr;
00464       m_numberOfClientsPtr = source.m_numberOfClientsPtr;
00465       m_headChangedCondition = source.m_headChangedCondition;
00466       m_isLockedPtr = source.m_isLockedPtr;
00467       m_isLockedCondition = source.m_isLockedCondition;
00468       m_referenceCountPtr = source.m_referenceCountPtr;
00469       ++(*m_referenceCountPtr);
00470     }
00471 
00472     
00473     // Destroys the DistributionQueue instance and releases any resources.
00474     template <class Type>
00475     DistributionQueue<Type>::
00476     ~DistributionQueue()
00477     {
00478       Token token = this->getToken();
00479       // No need to wait for *m_isLockedPtr because we're not going to
00480       // read or change queue contents unless we're the only instance
00481       // referencing this data.
00482       
00483       if(!(m_referenceCountPtr->isShared())) {
00484         delete m_headIndexPtr;
00485         delete m_tailIndexPtr;
00486         delete m_bufferArrayPtr;
00487         delete m_clientNameMapPtr;
00488         delete m_dataArray;
00489         delete m_maxNumberOfClientsPtr;
00490         delete m_numberOfClientsPtr;
00491         delete m_isLockedPtr;
00492         delete m_referenceCountPtr;
00493       } else {
00494         --(*m_referenceCountPtr);
00495       }
00496     }
00497 
00498 
00499     // The assignment operator does a shallow copy.
00500     template <class Type>
00501     DistributionQueue<Type>&
00502     DistributionQueue<Type>::
00503     operator=(const DistributionQueue<Type>& source)
00504     {
00505       if(&source != this) {
00506         Monitor::operator=(source);
00507         Token token = this->getToken();
00508         // No need to wait for *m_isLockedPtr because we're not going
00509         // to read or change queue contents unless we're the only
00510         // instance referencing the data.
00511 
00512         this->release();
00513         this->copyOther(source);
00514       }
00515     }      
00516 
00517 
00518     // This member function copies the tail element of the
00519     // DistributionQueue.
00520     template <class Type>
00521     void
00522     DistributionQueue<Type>::
00523     bufferBack(const ClientID& clientID)
00524     {
00525       // All locking, synchronizing, etc. handled inside lockBack().
00526       this->lockBack(clientID);
00527       
00528       try {
00529         (*m_bufferArrayPtr)[clientID.m_idNumber]
00530           = m_dataArray[*m_tailIndexPtr];
00531       } catch(...) {
00532         this->unlockBack(clientID);
00533         throw;
00534       }
00535 
00536       ++(*m_tailIndexPtr);
00537       if(*m_tailIndexPtr == m_size) {
00538         *m_tailIndexPtr = 0;
00539       }
00540 
00541       this->unlockBack(clientID);
00542     }
00543 
00544 
00545     // This member function copies the tail element of the
00546     // DistributionQueue.
00547     template <class Type>
00548     bool
00549     DistributionQueue<Type>::
00550     bufferBack(const ClientID& clientID, double timeout)
00551     {
00552       // All locking, synchronizing, etc. handled inside lockBack().
00553       if(this->lockBack(clientID, timeout) == false) {
00554         return false;
00555       }
00556 
00557       try {
00558         (*m_bufferArrayPtr)[clientID.m_idNumber]
00559           = m_dataArray[*m_tailIndexPtr];
00560       } catch(...) {
00561         this->unlockBack(clientID);
00562         throw;
00563       }
00564 
00565       ++(*m_tailIndexPtr);
00566       if(*m_tailIndexPtr == m_size) {
00567         *m_tailIndexPtr = 0;
00568       }
00569 
00570       this->unlockBack(clientID);
00571       return true;
00572     }
00573 
00574 
00575     // This member function empties the queue.
00576     template <class Type>
00577     void
00578     DistributionQueue<Type>::
00579     clear()
00580     {
00581       Token token = this->getToken();
00582       this->waitForLock();
00583       *m_tailIndexPtr = *m_headIndexPtr;
00584       this->releaseLock();
00585     }
00586 
00587     
00588     // This member function copies the buffered element (see member
00589     // function bufferBack() from the internal buffer into user
00590     // code.
00591     template <class Type>
00592     void
00593     DistributionQueue<Type>::
00594     copyBuffer(const ClientID& clientID, Type& target)
00595     {
00596       target = (*m_bufferArrayPtr)[clientID.m_idNumber];
00597     }
00598 
00599 
00600     // This member function locks the tail element of the DistributionQueue
00601     // instance so that it cannot fall off the back of the queue.
00602     template <class Type>
00603     void
00604     DistributionQueue<Type>::
00605     lockBack(const ClientID& clientID)
00606     {
00607       Token token = this->getToken();
00608       this->waitForLock(token);
00609 
00610       // Make sure there's data available to copy.  That is, make
00611       // sure we're not ahead of the queue, and wait for a producer
00612       // to add an element if we are.
00613       while(*m_tailIndexPtr == *m_headIndexPtr) {
00614         this->releaseLock();
00615         this->wait(m_headChangedCondition, token);
00616         this->waitForLock(token);
00617       }
00618 
00619       // Note that we still have the lock!
00620       return;
00621     }
00622 
00623 
00624     // This member function locks the tail element of the DistributionQueue
00625     // instance so that it cannot fall off the back of the queue.
00626     template <class Type>
00627     bool
00628     DistributionQueue<Type>::
00629     lockBack(const ClientID& clientID, double timeout)
00630     {
00631       // Get exclusive control of the distributionQueue.
00632       double deadline = getCurrentTime() + timeout;
00633       Token token = this->getToken(timeout);
00634       if(!token) {
00635         return false;
00636       }
00637       if(!this->waitForLock(token, deadline - getCurrentTime())) {
00638         return false;
00639       }
00640 
00641       // Make sure there's data available to copy.  That is, make
00642       // sure we're not ahead of the queue, and wait for a producer
00643       // to add an element if we are.
00644       while(*m_tailIndexPtr == *m_headIndexPtr) {
00645         this->releaseLock();
00646         if(!this->wait(m_headChangedCondition, token,
00647                        deadline - getCurrentTime())) {
00648           return false;
00649         }
00650         if(!this->waitForLock(token, deadline - getCurrentTime())) {
00651           return false;
00652         }
00653       }
00654         
00655       // Note that we still have the lock!
00656       return true;
00657     }
00658 
00659 
00660     template <class Type>
00661     void
00662     DistributionQueue<Type>::
00663     pushFront(const ClientID& clientID, const Type& element)
00664     {
00665       // This variable lets us avoid time-consuming formatting of
00666       // error messages during the time that we have *m_accessMutexPtr
00667       // locked.
00668       bool overflowFlag = false;
00669       {
00670         // Get exclusive control of the DistributionQueue.
00671         Token token = this->getToken();
00672         this->waitForLock(token);
00673         try {
00674           // Increment head, and wrap around if necessary.
00675           size_t newHeadIndex = *m_headIndexPtr + 1;
00676           if(newHeadIndex == m_size) {
00677             newHeadIndex = 0;
00678           }
00679           
00680           // Check for overflow.
00681           if(newHeadIndex == *m_tailIndexPtr) {
00682             overflowFlag = true;
00683           } else {
00684             // Proceed with the copying, etc.
00685             m_dataArray[*m_headIndexPtr] = element;
00686           }
00687           *m_headIndexPtr = newHeadIndex;
00688           
00689           // Wake up any threads that are waiting for data.
00690           this->signalAll(m_headChangedCondition);
00691         } catch(...) {
00692           this->releaseLock();
00693           throw;
00694         }
00695         this->releaseLock();
00696       }
00697 
00698       // Report any errors.
00699       if(overflowFlag) {
00700         std::ostringstream message;
00701         message << "Queue size exceeds the allowable maximum of "
00702                 << m_size << " " << "on a function call from thread " 
00703                 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
00704                 << ".";
00705         DLR_THROW(OverflowException, "DistributionQueue::pushFront()",
00706                   message.str().c_str());
00707       }
00708     }
00709 
00710 
00711     // This member function introduces a thread to a
00712     // DistributionQueue instance.
00713     template <class Type>
00714     void 
00715     DistributionQueue<Type>::
00716     registerClient(ClientID& clientID, const std::string& clientName)
00717     {
00718       // Registration must be strictly serialized.  Only one thread
00719       // can register at once.
00720       Token token = this->getToken();
00721       this->waitForLock(token);
00722 
00723       try {
00724         m_clientNameMapPtr->insert(
00725           std::make_pair(*m_numberOfClientsPtr, clientName));
00726         clientID.m_count = 0;
00727         clientID.m_idNumber = (*m_numberOfClientsPtr)++;
00728         clientID.m_index = 0;
00729 
00730         // If we now have more than the expected number of clients, we
00731         // have to resize our internal storage.
00732         if(*m_numberOfClientsPtr > *m_maxNumberOfClientsPtr) {
00733           size_t oldMaxNumberOfClients = *m_maxNumberOfClientsPtr;
00734           Type* oldBufferArray = *m_bufferArrayPtr;
00735         
00736           *m_maxNumberOfClientsPtr *= 2;
00737           *m_bufferArrayPtr = new Type[*m_maxNumberOfClientsPtr];
00738 
00739           std::copy(oldBufferArray, oldBufferArray + oldMaxNumberOfClients,
00740                     *m_bufferArrayPtr);
00741           delete[] oldBufferArray;        
00742         }
00743       } catch(...) {
00744         this->releaseLock();
00745         throw;
00746       }
00747       this->releaseLock();
00748     }
00749     
00750 
00751     // This member function releases a lock acquired by
00752     // lockBack().
00753     template <class Type>
00754     void
00755     DistributionQueue<Type>::
00756     unlockBack(const ClientID& clientID)
00757     {
00758       Token token = this->getToken();
00759       this->releaseLock();
00760     }
00761 
00762     
00763     template <class Type>
00764     void
00765     DistributionQueue<Type>::
00766     releaseLock()
00767     {
00768       *m_isLockedPtr = false;
00769       this->signalOne(m_isLockedCondition);
00770     }
00771 
00772     
00773     template <class Type>
00774     void
00775     DistributionQueue<Type>::
00776     waitForLock(Token& token)
00777     {
00778       while(*m_isLockedPtr) {
00779         this->wait(this->m_isLockedCondition, token);
00780       }
00781       *m_isLockedPtr = true;
00782     }
00783 
00784 
00785     template <class Type>
00786     bool
00787     DistributionQueue<Type>::
00788     waitForLock(Token& token, double timeout)
00789     {
00790       double deadline = getCurrentTime() + timeout;
00791       while(*m_isLockedPtr) {
00792         if(this->wait(this->m_isLockedCondition, token,
00793                       deadline - getCurrentTime()) == false) {
00794           return false;
00795         }
00796       }
00797       *m_isLockedPtr = true;
00798       return true;
00799     }
00800     
00801     
00802   } // namespace thread
00803 
00804 } // namespace dlr
00805 
00806 #endif /* #ifndef _DLR_THREAD_DISTRIBUTIONQUEUE_H_ */

Generated on Mon Jul 9 20:34:02 2007 for dlrLibs Utility Libraries by  doxygen 1.5.2