broadcastQueue.h

Go to the documentation of this file.
00001 
00014 #ifndef _DLR_THREAD_BROADCASTQUEUE_H_
00015 #define _DLR_THREAD_BROADCASTQUEUE_H_
00016 
00017 #include <map>
00018 #include <boost/thread.hpp>
00019 #include <dlrThread/clientID.h>
00020 #include <dlrThread/exception.h>
00021 
00022 
00023 namespace dlr {
00024 
00025   namespace thread {
00026 
00053     template <class Type>
00054     class BroadcastQueue
00055     {
00056     public:
00057 
00069       typedef ClientID<Type> ClientID;
00070 
00071 
00089       BroadcastQueue(size_t maximumLength, size_t numberOfClients=2);
00090 
00091 
00099       BroadcastQueue(const BroadcastQueue<Type>& other);
00100 
00101 
00105       virtual
00106       ~BroadcastQueue();
00107 
00108 
00117       BroadcastQueue<Type>&
00118       operator=(const BroadcastQueue<Type>& other);
00119 
00120 
00134       void
00135       catchUp(ClientID& clientID);
00136       
00137 
00166       void
00167       copyBack(const ClientID& clientID, Type& target);
00168 
00169 
00193       bool
00194       copyBack(const ClientID& clientID, Type& target, double timeout);
00195 
00196 
00204       size_t
00205       getMaximumLength() {return m_size;}
00206       
00207 
00256       void
00257       lockBack(const ClientID& clientID);
00258 
00259 
00280       bool
00281       lockBack(const ClientID& clientID, double timeout);
00282       
00283 
00298       inline void
00299       popBack(ClientID& clientID);
00300       
00301 
00320       void
00321       pushFront(const ClientID& clientID, const Type& element);
00322 
00323 
00361       void
00362       registerClient(ClientID& clientID,
00363                      const std::string& clientName="anonymous");
00364 
00365 
00393       void
00394       synchronize(ClientID& clientID);
00395 
00396 
00422       bool
00423       synchronize(ClientID& clientID, double timeout);
00424 
00425 
00471       void
00472       synchronizeQuorum(ClientID& clientID, size_t quorumSize);
00473 
00474 
00509       bool
00510       synchronizeQuorum(ClientID& clientID, size_t quorumSize, double timeout);
00511       
00512       
00522       void
00523       unlockBack(const ClientID& clientID);
00524       
00525     private:
00526 
00535       void
00536       copyOther(const BroadcastQueue<Type>& other);
00537 
00538       
00563       inline void
00564       declareReadLock(const ClientID& clientID,
00565                       bool& overflowFlag,
00566                       size_t& headElementNumber);
00567       
00568 
00583       void 
00584       handleSynchronize(ClientID& clientID,
00585                         size_t targetNumber,
00586                         boost::timed_mutex::scoped_lock& syncLock);
00587       
00588 
00612       bool
00613       handleSynchronize(ClientID& clientID,
00614                         size_t targetNumber,
00615                         boost::xtime xtimeout,
00616                         boost::timed_mutex::scoped_timed_lock& syncLock);
00617 
00618 
00623       void
00624       releaseResources();
00625       
00626       
00627       size_t* m_countPtr;
00628       size_t* m_headIndexPtr;
00629       size_t m_size;
00630       
00631       std::map<size_t, std::string>* m_clientNameMapPtr;
00632       Type* m_dataArray;
00633       size_t* m_maxNumberOfClientsPtr;
00634       size_t* m_numberOfClientsPtr;
00635       size_t* m_readLockCountArray;
00636       size_t** m_readLockingFlagsArray;
00637       size_t* m_syncClientCountPtr;
00638       size_t* m_syncElementCountPtr;
00639       size_t* m_syncIndexPtr;
00640       size_t* m_quorumSizePtr;
00641 
00642       boost::condition* m_headChangedConditionPtr;
00643       boost::timed_mutex* m_headIndexMutexPtr;
00644       // // This code left around in case we try to go back to
00645       // // individual mutexes for data elements.
00646       // boost::timed_mutex* m_mutexArray;
00647       boost::timed_mutex* m_numberOfClientsMutexPtr;
00648 
00649       boost::condition* m_syncConditionPtr;
00650       boost::timed_mutex* m_syncMutexPtr;
00651 
00652       // We don't use the pre-written ReferenceCount class becuase
00653       // it's not thread-safe.
00654       size_t* m_referenceCountPtr;
00655     };
00656 
00657   } // namespace thread
00658 
00659 } // namespace dlr
00660 
00661 
00662 /* =============       Implementation follows        ============= */
00663 
00664 #include <algorithm>
00665 #include <iostream>
00666 #include <sstream>
00667 #include <dlrThread/private.h>
00668 
00669 
00670 namespace dlr {
00671 
00672   namespace thread {
00673 
00674     /* ===========    Implementation of BroadcastQueue      =========== */
00675 
00676     template <class Type>
00677     BroadcastQueue<Type>::
00678     BroadcastQueue(size_t maximumLength, size_t numberOfClients)
00679       : m_countPtr(new size_t),
00680         m_headIndexPtr(new size_t),
00681         m_size(maximumLength),
00682         m_clientNameMapPtr(new std::map<size_t, std::string>),
00683         m_dataArray(new Type[maximumLength]),
00684         m_maxNumberOfClientsPtr(new size_t),
00685         m_numberOfClientsPtr(new size_t),
00686         m_readLockCountArray(new size_t[maximumLength]),
00687         m_readLockingFlagsArray(new size_t*[maximumLength]),
00688         m_syncClientCountPtr(new size_t),
00689         m_syncElementCountPtr(new size_t),
00690         m_syncIndexPtr(new size_t),
00691         m_quorumSizePtr(new size_t),
00692         m_headChangedConditionPtr(new boost::condition),
00693         m_headIndexMutexPtr(new boost::timed_mutex),
00694         // // This code left around in case we try to go back to
00695         // // individual mutexes for data elements.
00696         // m_mutexArray(new boost::timed_mutex[maximumLength]),
00697         m_numberOfClientsMutexPtr(new boost::timed_mutex),
00698         m_syncConditionPtr(new boost::condition),
00699         m_syncMutexPtr(new boost::timed_mutex),
00700         m_referenceCountPtr(new size_t)
00701     {
00702       if(numberOfClients == 0) {
00703         DLR_THROW(ValueException, "BroadcastQueue::BroadcastQueue()",
00704                   "Argument numberOfClients must be greater than zero.");
00705       }
00706 
00707       // No elements have been added yet.
00708       *m_countPtr = 0;
00709       *m_headIndexPtr = 0;
00710 
00711       // Set client capacity as instructed.
00712       *m_maxNumberOfClientsPtr = numberOfClients;
00713 
00714       // But indicate that no clients have registered yet.
00715       *m_numberOfClientsPtr = 0;
00716 
00717       // For each element in the queue.
00718       for(size_t index0 = 0; index0 < m_size; ++index0) {
00719         // Allocate storage for the expected number of clients.
00720         m_readLockingFlagsArray[index0] =
00721           new size_t[*m_maxNumberOfClientsPtr];
00722         size_t* beginIterator = m_readLockingFlagsArray[index0];
00723         size_t* endIterator = beginIterator + (*m_maxNumberOfClientsPtr);
00724         std::fill(beginIterator, endIterator, 0);
00725 
00726         // Indicate that no client is locking this element.
00727         m_readLockCountArray[index0] = 0;
00728       }
00729 
00730       // Initially, no clients are waiting for sync events.
00731       *m_syncClientCountPtr = 0;
00732       *m_syncElementCountPtr = 0;
00733       *m_syncIndexPtr = 0;
00734       *m_quorumSizePtr = 0;
00735       *m_referenceCountPtr = 1;
00736     }
00737 
00738 
00739     // The copy constructor does a shallow copy.
00740     template <class Type>
00741     BroadcastQueue<Type>::
00742     BroadcastQueue(const BroadcastQueue<Type>& other)
00743       : m_countPtr(0),
00744         m_headIndexPtr(0),
00745         m_size(0),
00746         m_clientNameMapPtr(0),
00747         m_dataArray(0),
00748         m_maxNumberOfClientsPtr(0),
00749         m_numberOfClientsPtr(0),
00750         m_readLockCountArray(0),
00751         m_readLockingFlagsArray(0),
00752         m_syncClientCountPtr(0),
00753         m_syncElementCountPtr(0),
00754         m_syncIndexPtr(0),
00755         m_quorumSizePtr(0),
00756         m_headChangedConditionPtr(0),
00757         m_headIndexMutexPtr(0),
00758         m_numberOfClientsMutexPtr(0),
00759         m_syncConditionPtr(0),
00760         m_syncMutexPtr(0),
00761         m_referenceCountPtr(0)
00762     {
00763       this->copyOther(other);
00764     }
00765 
00766     
00767     // Destroys the BroadcastQueue instance and releases any resources.
00768     template <class Type>
00769     BroadcastQueue<Type>::
00770     ~BroadcastQueue()
00771     {
00772       this->releaseResources();
00773     }
00774 
00775 
00776     // The assignment operator does a shallow copy.
00777     template <class Type>
00778     BroadcastQueue<Type>&
00779     BroadcastQueue<Type>::
00780     operator=(const BroadcastQueue<Type>& other)
00781     {
00782       if(&other != this) {
00783         this->releaseResources();
00784         this->copyOther(other);
00785       }
00786     }      
00787 
00788 
00789     // This member function moves the calling thread to the head of
00790     // the BroadcastQueue.
00791     template <class Type>
00792     void
00793     BroadcastQueue<Type>::
00794     catchUp(ClientID& clientID)
00795     {
00796       boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
00797       clientID.m_count = *m_countPtr;
00798       clientID.m_index = *m_headIndexPtr;
00799     }
00800 
00801     
00802     // This member function copies the tail element of the
00803     // BroadcastQueue.
00804     template <class Type>
00805     void
00806     BroadcastQueue<Type>::
00807     copyBack(const ClientID& clientID, Type& target)
00808     {
00809       this->lockBack(clientID);
00810 
00811       try {
00812         target = m_dataArray[clientID.m_index];
00813       } catch(...) {
00814         this->unlockBack(clientID);
00815         throw;
00816       }
00817 
00818       this->unlockBack(clientID);
00819     }
00820 
00821 
00822     // This member function copies the tail element of the
00823     // BroadcastQueue.
00824     template <class Type>
00825     bool
00826     BroadcastQueue<Type>::
00827     copyBack(const ClientID& clientID, Type& target, double timeout)
00828     {
00829       if(this->lockBack(clientID, timeout) == false) {
00830         return false;
00831       }
00832 
00833       try {
00834         target = m_dataArray[clientID.m_index];
00835       } catch(...) {
00836         this->unlockBack(clientID);
00837         throw;
00838       }
00839 
00840       this->unlockBack(clientID);
00841       return true;
00842     }
00843 
00844 
00845     // This member function locks the tail element of the BroadcastQueue
00846     // instance so that it cannot fall off the back of the queue.
00847     template <class Type>
00848     void
00849     BroadcastQueue<Type>::
00850     lockBack(const ClientID& clientID)
00851     {
00852       // These variables let us avoid time-consuming formatting of
00853       // error messages during the time that we have
00854       // *m_headIndexMutexPtr locked.
00855       bool overflowFlag = false;
00856       size_t headElementNumber = 0;
00857       
00858       // Get exclusive control of the broadcastQueue, check that we're not
00859       // too far ahead or behind, and mark our current element as
00860       // being read.  The scoped_lock instance will be released when
00861       // it goes out of scope.
00862       {
00863         boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
00864 
00865         // // This code left around in case we try to go back to
00866         // // individual mutexes for data elements.
00867         // boost::timed_mutex::scoped_lock elementLock(
00868         //   m_mutexArray[clientID.m_index]);
00869         
00870         // Make sure there's data available to copy.  That is, make
00871         // sure we're not ahead of the queue, and wait for a producer
00872         // to add an element if we are.
00873         while(clientID.m_count >= (*m_countPtr)) {
00874           m_headChangedConditionPtr->wait(headLock);
00875         }
00876 
00877         // Mark the appropriate element as being read by the calling
00878         // thread.
00879         this->declareReadLock(clientID, overflowFlag, headElementNumber);
00880         
00881       } // Release the lock.
00882 
00883       if(overflowFlag) {
00884         std::ostringstream message;
00885         message << "Thread "
00886                 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
00887                 << " "
00888                 << "has fallen irretrievably far behind the front of the "
00889                 << "queue.  Accessed element is number "
00890                 << clientID.m_count << ", head element is number "
00891                 << headElementNumber << ".";
00892         DLR_THROW(OverflowException, "BroadcastQueue::lockBack()",
00893                   message.str().c_str());
00894       }
00895     }
00896 
00897 
00898     // This member function locks the tail element of the BroadcastQueue
00899     // instance so that it cannot fall off the back of the queue.
00900     template <class Type>
00901     bool
00902     BroadcastQueue<Type>::
00903     lockBack(const ClientID& clientID, double timeout)
00904     {
00905       // Argument checking.
00906       if(timeout < 0.0) {
00907         timeout = 0.0;
00908       }
00909       boost::xtime xtimeout;
00910       if(getXTime(timeout, xtimeout) == false) {
00911         // Note(xxx): should we throw an exception here?
00912         return false;
00913       }
00914       
00915       // These variables let us avoid time-consuming formatting of
00916       // error messages during the time that we have
00917       // *m_headIndexMutexPtr locked.
00918       bool overflowFlag = false;
00919       size_t headElementNumber = 0;
00920 
00921       // Get exclusive control of the broadcastQueue, check that we're not
00922       // too far ahead or behind, and mark our current element as
00923       // being read.  The scoped_lock instance will be released when
00924       // it goes out of scope.
00925       try {
00926         // This call throws a boost::lock_error if the timeout expires.
00927         boost::timed_mutex::scoped_timed_lock headLock(
00928           *m_headIndexMutexPtr, xtimeout);
00929 
00930         // Make sure there's data available to copy.  That is, make
00931         // sure we're not ahead of the queue, and wait for a producer
00932         // to add an element if we are.
00933         while(clientID.m_count >= (*m_countPtr)) {
00934           if(m_headChangedConditionPtr->timed_wait(headLock, xtimeout)
00935              == false) {
00936             // No new data before timeout expires.
00937             return false;
00938           }
00939         }
00940 
00941         // Mark the appropriate element as being read by the calling
00942         // thread.
00943         this->declareReadLock(clientID, overflowFlag, headElementNumber);
00944         
00945       } catch(const boost::lock_error&) {
00946         // The timeout expired before we could get a lock.
00947         return false;
00948       }
00949 
00950       if(overflowFlag) {
00951         std::ostringstream message;
00952         message << "Thread "
00953                 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
00954                 << " "
00955                 << "has fallen irretrievably far behind the front of the "
00956                 << "queue.  Accessed element is number "
00957                 << clientID.m_count << ", head element is number "
00958                 << headElementNumber << ".";
00959         DLR_THROW(OverflowException, "BroadcastQueue::lockBack()",
00960                   message.str().c_str());
00961       }
00962       return true;
00963     }
00964 
00965 
00966     template <class Type>
00967     inline void
00968     BroadcastQueue<Type>::
00969     popBack(ClientID& clientID)
00970     {
00971       ++clientID.m_count;
00972       ++clientID.m_index;
00973       if(clientID.m_index >= m_size) {
00974         clientID.m_index = 0;
00975       }
00976       if(clientID.m_count == std::numeric_limits<size_t>::max()) {
00977         std::ostringstream message;
00978         message << "Thread "
00979                 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
00980                 << " "
00981                 << "has exceeded the maximum allowable queue position of "
00982                 << std::numeric_limits<size_t>::max() - 1 << ".";
00983         DLR_THROW(LogicException, "BroadcastQueue::popBack()",
00984                   message.str().c_str());
00985       }
00986     }
00987 
00988 
00989     template <class Type>
00990     void
00991     BroadcastQueue<Type>::
00992     pushFront(const ClientID& clientID, const Type& element)
00993     {
00994       // These variables let us avoid time-consuming formatting of
00995       // error messages during the time that we have
00996       // *m_headIndexMutexPtr locked.
00997       bool readLockedFlag = false;
00998       bool countOverflowFlag = false;
00999       size_t threadIDNumber = 0;
01000 
01001       // Get exclusive control of the BroadcastQueue, check that noone's
01002       // read-locked an element that's so far behind the head that the
01003       // head has wrapped around and is about to stomp on the locked
01004       // data, copy the new element into it, and advance the head.
01005       {
01006         // This lock will be automatically released when it goes out
01007         // of scope.
01008         boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01009 
01010         if(m_readLockCountArray[*m_headIndexPtr] != 0) {
01011           // Ack! This element is read-locked.  Figure out which
01012           // thread is the culprit.
01013           readLockedFlag = true;
01014           size_t* flagsPtr = m_readLockingFlagsArray[*m_headIndexPtr];
01015           while(threadIDNumber < (*m_numberOfClientsPtr)) {
01016             if(flagsPtr[threadIDNumber] != 0) {
01017               break;
01018             }
01019             ++threadIDNumber;
01020           }
01021         } else {
01022           // Not read-locked, proceed with the copying, etc.
01023           m_dataArray[*m_headIndexPtr] = element;
01024           ++(*m_headIndexPtr);
01025           ++(*m_countPtr);
01026           if((*m_headIndexPtr) >= m_size) {
01027             (*m_headIndexPtr) = 0;
01028           }
01029           if(*m_countPtr == std::numeric_limits<size_t>::max()) {
01030             countOverflowFlag = true;
01031           }
01032 
01033           // Wake up any threads that are waiting for data.
01034           m_headChangedConditionPtr->notify_all();
01035         }
01036       }
01037 
01038       // Report any errors.
01039       if(readLockedFlag) {
01040         // Sanity check.
01041         if(threadIDNumber == *m_numberOfClientsPtr) {
01042           DLR_THROW(LogicException, "BroadcastQueue::pushFront()",
01043                     "Read lock accounting is in an inconsistent state.");
01044         }
01045         std::ostringstream message;
01046         message << "Thread "
01047                 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
01048                 << " "
01049                 << "can't advance the queue because thread "
01050                 << (m_clientNameMapPtr->find(threadIDNumber))->second
01051                 << " "
01052                 << "is still read-locking the tail element.";
01053         DLR_THROW(OverflowException, "BroadcastQueue::pushFront()",
01054                   message.str().c_str());
01055       }
01056       if(countOverflowFlag) {
01057         std::ostringstream message;
01058         message << "Queue size exceeds the allowable maximum of "
01059                 << std::numeric_limits<size_t>::max() - 1 << " "
01060                 << "on a function call from thread " 
01061                 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
01062                 << ".";
01063         DLR_THROW(OverflowException, "BroadcastQueue::pushFront()",
01064                   message.str().c_str());
01065       }
01066     }
01067 
01068 
01069     // This member function returns a ClientID instance that should
01070     // be used by the calling thread in all subsequent interactions
01071     // with the BroadcastQueue instance.
01072     template <class Type>
01073     void 
01074     BroadcastQueue<Type>::
01075     registerClient(ClientID& clientID, const std::string& clientName)
01076     {
01077       // Registration must be strictly serialized.  Only one thread
01078       // can register at once.  Also, producer threads must be locked
01079       // out so they don't throw an exception when they can't get
01080       // locks on m_mutexArray elements.
01081       boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01082 
01083       // If adding another client will overflow our bookkeeping space,
01084       // then allocate a bigger bookkeeping space and copy the old
01085       // data into it.
01086       if(*m_numberOfClientsPtr >= (*m_maxNumberOfClientsPtr)) {
01087         size_t newMaxNumberOfClients = (*m_maxNumberOfClientsPtr) << 1;
01088         for(size_t index0 = 0; index0 < m_size; ++index0) {
01089           {
01090             // // This code left around in case we try to go back to
01091             // // individual mutexes for data elements.
01092             // boost::timed_mutex::scoped_lock elementLock(
01093             //   m_mutexArray[index0]);
01094           
01095             size_t* newFlagsArray = new size_t[newMaxNumberOfClients];
01096             size_t* beginIterator = m_readLockingFlagsArray[index0];
01097             size_t* endIterator = beginIterator + (*m_maxNumberOfClientsPtr);
01098             std::copy(beginIterator, endIterator, newFlagsArray);
01099             
01100             beginIterator = newFlagsArray;
01101             endIterator = beginIterator + newMaxNumberOfClients;
01102             beginIterator += (*m_maxNumberOfClientsPtr);
01103             std::fill(beginIterator, endIterator, 0);
01104             
01105             delete[] m_readLockingFlagsArray[index0];
01106             m_readLockingFlagsArray[index0] = newFlagsArray;
01107           }
01108         }
01109         *m_maxNumberOfClientsPtr = newMaxNumberOfClients;
01110       }
01111       m_clientNameMapPtr->insert(
01112         std::make_pair(*m_numberOfClientsPtr, clientName));
01113       clientID.m_count = 0;
01114       clientID.m_idNumber = (*m_numberOfClientsPtr)++;
01115       clientID.m_index = 0;
01116     }
01117     
01118 
01119     // This member function blocks until all clients of the
01120     // BroadcastQueue have called it, then moves the client to the head
01121     // of the queue.
01122     template <class Type>
01123     void 
01124     BroadcastQueue<Type>::
01125     synchronize(ClientID& clientID)
01126     {
01127       boost::timed_mutex::scoped_lock syncLock(*m_syncMutexPtr);
01128       
01129       if((*m_syncClientCountPtr) != 0) {
01130         if((*m_quorumSizePtr) != 0) {
01131           std::ostringstream message;
01132           message << "Can't synchronize with all registered clients "
01133                   << "while another thread is waiting for a quorum of size "
01134                   << (*m_quorumSizePtr) << "."; 
01135           DLR_THROW(StateException, "BroadcastQueue::synchronize()",
01136                     message.str().c_str());
01137         }
01138       }
01139       (*m_quorumSizePtr) = 0;
01140 
01141       this->handleSynchronize(clientID, (*m_numberOfClientsPtr), syncLock);
01142     }
01143 
01144 
01145     // This member function blocks until all clients of the
01146     // BroadcastQueue have called it, then moves the client to the head
01147     // of the queue.
01148     template <class Type>
01149     bool
01150     BroadcastQueue<Type>::
01151     synchronize(ClientID& clientID, double timeout)
01152     {
01153       // Argument checking.
01154       if(timeout < 0.0) {
01155         timeout = 0.0;
01156       }
01157       boost::xtime xtimeout;
01158       if(getXTime(timeout, xtimeout) == false) {
01159         // Note(xxx): should we throw an exception here?
01160         return false;
01161       }
01162 
01163       bool returnValue = false;
01164       try {
01165         // This call throws a boost::lock_error if the timeout expires.
01166         boost::timed_mutex::scoped_timed_lock syncLock(
01167           *m_syncMutexPtr, xtimeout);
01168 
01169         // State checking.
01170         if((*m_syncClientCountPtr) != 0) {
01171           if((*m_quorumSizePtr) != 0) {
01172             std::ostringstream message;
01173             message << "Can't synchronize with all registered clients "
01174                     << "while another thread is waiting for a quorum of size "
01175                     << (*m_quorumSizePtr) << "."; 
01176             DLR_THROW(StateException, "BroadcastQueue::synchronize()",
01177                       message.str().c_str());
01178           }
01179         }
01180         (*m_quorumSizePtr) = 0;
01181         
01182         returnValue =
01183           this->handleSynchronize(
01184             clientID, (*m_numberOfClientsPtr), xtimeout, syncLock);
01185       } catch(const boost::lock_error&) {
01186         // The timeout expired before we could get a lock.
01187       }
01188       return returnValue;
01189     }
01190 
01191 
01192     // This member function blocks until the specified number of
01193     // BroadcastQueue clients have called it, then moves the client to
01194     // the head of the queue.
01195     template <class Type>
01196     void
01197     BroadcastQueue<Type>::
01198     synchronizeQuorum(ClientID& clientID, size_t quorumSize)
01199     {
01200       boost::timed_mutex::scoped_lock syncLock(*m_syncMutexPtr);
01201 
01202       if(quorumSize == 0) {
01203         quorumSize = (*m_numberOfClientsPtr);
01204       }
01205       
01206       if((*m_syncClientCountPtr) != 0) {
01207         if((*m_quorumSizePtr) == 0) {
01208           DLR_THROW(StateException, "BroadcastQueue::synchronizeQuorum()",
01209                     "Can't start a quorum while another thread is blocking "
01210                     "in member function synchronize()");         
01211         } else if((*m_quorumSizePtr) != quorumSize) {
01212           std::ostringstream message;
01213           message << "Can't start a quorum of size " << quorumSize << " "
01214                   << "while another thread is waiting for a quorum of size "
01215                   << (*m_quorumSizePtr) << ".";
01216           DLR_THROW(StateException, "BroadcastQueue::synchronizeQuorum()",
01217                     message.str().c_str());
01218         }
01219       }
01220       (*m_quorumSizePtr) = quorumSize;
01221 
01222       this->handleSynchronize(clientID, (*m_quorumSizePtr), syncLock);
01223     }
01224 
01225 
01226     // This member function blocks until the specified number of
01227     // BroadcastQueue clients have called it, then moves the client to
01228     // the head of the queue.
01229     template <class Type>
01230     bool
01231     BroadcastQueue<Type>::
01232     synchronizeQuorum(ClientID& clientID, size_t quorumSize, double timeout)
01233     {
01234       // Argument checking.
01235       if(timeout < 0.0) {
01236         timeout = 0.0;
01237       }
01238       boost::xtime xtimeout;
01239       if(getXTime(timeout, xtimeout) == false) {
01240         // Note(xxx): should we throw an exception here?
01241         return false;
01242       }
01243       
01244       bool returnValue = false;
01245       try {
01246         // This call throws a boost::lock_error if the timeout expires.
01247         boost::timed_mutex::scoped_timed_lock syncLock(
01248           *m_syncMutexPtr, xtimeout);
01249 
01250         // State checking.
01251         if(quorumSize == 0) {
01252           quorumSize = (*m_numberOfClientsPtr);
01253         }
01254       
01255         if((*m_syncClientCountPtr) != 0) {
01256           if((*m_quorumSizePtr) == 0) {
01257             DLR_THROW(StateException, "BroadcastQueue::synchronizeQuorum()",
01258                       "Can't start a quorum while another thread is blocking "
01259                       "in member function synchronize()");         
01260           } else if((*m_quorumSizePtr) != quorumSize) {
01261             std::ostringstream message;
01262             message << "Can't start a quorum of size " << quorumSize << " "
01263                     << "while another thread is waiting for a quorum of size "
01264                     << (*m_quorumSizePtr) << ".";
01265             DLR_THROW(StateException, "BroadcastQueue::synchronizeQuorum()",
01266                       message.str().c_str());
01267           }
01268         }
01269         (*m_quorumSizePtr) = quorumSize;
01270         
01271         returnValue =
01272           this->handleSynchronize(
01273             clientID, (*m_quorumSizePtr), xtimeout, syncLock);
01274       } catch(const boost::lock_error&) {
01275         // The timeout expired before we could get a lock.
01276       }
01277       return returnValue;
01278     }
01279 
01280     
01281     // This member function releases a read lock acquired by
01282     // lockBack().
01283     template <class Type>
01284     void
01285     BroadcastQueue<Type>::
01286     unlockBack(const ClientID& clientID)
01287     {
01288       // Get exclusive control of the broadcastQueue, and undo the read
01289       // lock.  The scoped_lock instance will be released when it goes
01290       // out of scope.
01291       {
01292         boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01293 
01294         // m_readLockingFlagsArray keeps track of which threads are
01295         // read-locking which elements.  Note that lockedFlag is a
01296         // reference.
01297         size_t& lockedFlag =
01298           m_readLockingFlagsArray[clientID.m_index][clientID.m_idNumber];
01299 
01300         // We only need to release the lock if this client is actually
01301         // locking this element.
01302         if(lockedFlag != 0) {
01303           --lockedFlag;
01304           --(m_readLockCountArray[clientID.m_index]);
01305         }
01306       } // Release the lock.
01307     }
01308     
01309     
01310     /* =========== Private member functions of BroadcastQueue =========== */
01311 
01312     // This private member function is called by the copy
01313     // constructor and assignment operator to copy another
01314     // BroadcastQueue instance in a thread-safe way.
01315     template <class Type>
01316     void
01317     BroadcastQueue<Type>::
01318     copyOther(const BroadcastQueue<Type>& other)
01319     {
01320       {
01321         boost::timed_mutex::scoped_lock
01322           headLock(*(other.m_headIndexMutexPtr));
01323 
01324         m_countPtr = other.m_countPtr;
01325         m_headIndexPtr = other.m_headIndexPtr;
01326         m_size = other.m_size;
01327         m_clientNameMapPtr = other.m_clientNameMapPtr;
01328         m_dataArray = other.m_dataArray;
01329         m_maxNumberOfClientsPtr = other.m_maxNumberOfClientsPtr;
01330         m_numberOfClientsPtr = other.m_numberOfClientsPtr;
01331         m_readLockCountArray = other.m_readLockCountArray;
01332         m_readLockingFlagsArray = other.m_readLockingFlagsArray;
01333         m_syncClientCountPtr = other.m_syncClientCountPtr;
01334         m_syncElementCountPtr = other.m_syncElementCountPtr;
01335         m_syncIndexPtr = other.m_syncIndexPtr;
01336         m_quorumSizePtr = other.m_quorumSizePtr;
01337         m_headIndexMutexPtr = other.m_headIndexMutexPtr;
01338         m_headChangedConditionPtr = other.m_headChangedConditionPtr;
01339         // // This code left around in case we try to go back to
01340         // // individual mutexes for data elements.
01341         // m_mutexArray = other.m_mutexArray;
01342         m_numberOfClientsMutexPtr = other.m_numberOfClientsMutexPtr;
01343         m_syncConditionPtr = other.m_syncConditionPtr;
01344         m_syncMutexPtr = other.m_syncMutexPtr;
01345         m_referenceCountPtr = other.m_referenceCountPtr;
01346         ++(*m_referenceCountPtr);
01347       }
01348     }
01349 
01350     
01351     // This private member function should only be called when
01352     // m_headIndexMutexPtr is locked.
01353     template <class Type>
01354     inline void
01355     BroadcastQueue<Type>::
01356     declareReadLock(const ClientID& clientID,
01357                     bool& overflowFlag,
01358                     size_t& headElementNumber)
01359     {
01360       // Check that we're not too behind the head of the queue, and
01361       // mark our current element as being read.
01362       if((clientID.m_count + m_size) < (*m_countPtr)) {
01363         overflowFlag = true;
01364         headElementNumber = *m_countPtr;
01365       } else {
01366         // No overflow.
01367         ++(m_readLockingFlagsArray[clientID.m_index][clientID.m_idNumber]);
01368         ++(m_readLockCountArray[clientID.m_index]);
01369       }
01370     }
01371     
01372 
01373     // This private member function does the actual mechanics of
01374     // synchronizing threads.
01375     template <class Type>
01376     void 
01377     BroadcastQueue<Type>::
01378     handleSynchronize(ClientID& clientID, size_t targetNumber,
01379                       boost::timed_mutex::scoped_lock& syncLock)
01380     {
01381       // We know (because one of our arguments is a lock object) that
01382       // it's safe to mess with the m_sync* data members.
01383       ++(*m_syncClientCountPtr);
01384       if((*m_syncClientCountPtr) >= targetNumber) {
01385         {
01386           // We need to lock headlock before accessing m_countPtr and
01387           // m_headIndexPtr so they don't change out from under us.
01388           boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01389           *m_syncElementCountPtr = *m_countPtr;
01390           *m_syncIndexPtr = *m_headIndexPtr;
01391         }
01392         // Wake up any threads that are waiting for sync.
01393         (*m_syncClientCountPtr) = 0;
01394         m_syncConditionPtr->notify_all();
01395       } else {
01396         m_syncConditionPtr->wait(syncLock);
01397       }
01398       clientID.m_count = *m_syncElementCountPtr;
01399       clientID.m_index = *m_syncIndexPtr;
01400     }
01401     
01402 
01403     // This private member function does the actual mechanics of
01404     // synchronizing threads.
01405     template <class Type>
01406     bool
01407     BroadcastQueue<Type>::
01408     handleSynchronize(ClientID& clientID,
01409                       size_t targetNumber,
01410                       boost::xtime xtimeout,
01411                       boost::timed_mutex::scoped_timed_lock& syncLock)
01412     {
01413       // We know (because one of our arguments is a lock object) that
01414       // it's safe to mess with the m_sync* data members.
01415       ++(*m_syncClientCountPtr);
01416       if((*m_syncClientCountPtr) >= targetNumber) {
01417         try {
01418           // We need to lock headlock before accessing m_countPtr and
01419           // m_headIndexPtr so they don't change out from under us.
01420           boost::timed_mutex::scoped_timed_lock
01421             headLock(*m_headIndexMutexPtr, xtimeout);
01422           *m_syncElementCountPtr = *m_countPtr;
01423           *m_syncIndexPtr = *m_headIndexPtr;
01424         } catch(const boost::lock_error&) {
01425           --(*m_syncClientCountPtr);
01426           return false;
01427         }
01428         // Wake up any threads that are waiting for sync.
01429         (*m_syncClientCountPtr) = 0;
01430         m_syncConditionPtr->notify_all();
01431       } else {
01432         if(m_syncConditionPtr->timed_wait(syncLock, xtimeout) == false) {
01433           // Sync event didn't happen.  Bail out.
01434           --(*m_syncClientCountPtr);
01435           return false;
01436         }
01437       }
01438       clientID.m_count = *m_syncElementCountPtr;
01439       clientID.m_index = *m_syncIndexPtr;
01440       return true;
01441     }
01442 
01443 
01444     // This protected member function is called by the destructor
01445     // and copy constructor to decrement reference counts, etc.
01446     template <class Type>
01447     void
01448     BroadcastQueue<Type>::    
01449     releaseResources()
01450     {
01451       bool isShared = true;
01452       {
01453         boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01454         if(--(*m_referenceCountPtr) == 0) {
01455           isShared = false;
01456         }
01457       }
01458         
01459       if(!isShared) {
01460         delete m_countPtr;
01461         delete m_headIndexPtr;
01462         delete m_clientNameMapPtr;
01463         delete[] m_dataArray;
01464         delete m_maxNumberOfClientsPtr;
01465         delete m_numberOfClientsPtr;
01466         delete[] m_readLockCountArray;
01467         for(size_t index0 = 0; index0 < m_size; ++index0) {
01468           delete[] m_readLockingFlagsArray[index0];
01469         }
01470         delete[] m_readLockingFlagsArray;
01471         delete m_syncClientCountPtr;
01472         delete m_syncElementCountPtr;
01473         delete m_syncIndexPtr;
01474         delete m_quorumSizePtr;
01475         delete m_headChangedConditionPtr;
01476         delete m_headIndexMutexPtr;
01477         // // This code left around in case we try to go back to
01478         // // individual mutexes for data elements.
01479         // delete[] m_mutexArray;
01480         delete m_numberOfClientsMutexPtr;
01481         delete m_syncConditionPtr;
01482         delete m_syncMutexPtr;
01483         delete m_referenceCountPtr;
01484       }
01485     }
01486     
01487   } // namespace thread
01488 
01489 } // namespace dlr
01490 
01491 
01492 
01493 #endif /* #ifndef _DLR_THREAD_BROADCASTQUEUE_H_ */

Generated on Wed Jun 25 13:47:22 2008 for dlrUtilities Utility Library by  doxygen 1.5.5