00001
00014 #ifndef _DLR_THREAD_BROADCASTQUEUE_H_
00015 #define _DLR_THREAD_BROADCASTQUEUE_H_
00016
00017 #include <map>
00018 #include <boost/thread.hpp>
00019 #include <dlrThread/clientID.h>
00020 #include <dlrThread/exception.h>
00021
00022
00023 namespace dlr {
00024
00025 namespace thread {
00026
00053 template <class Type>
00054 class BroadcastQueue
00055 {
00056 public:
00057
00069 typedef ClientID<Type> ClientID;
00070
00071
00089 BroadcastQueue(size_t maximumLength, size_t numberOfClients=2);
00090
00091
00099 BroadcastQueue(const BroadcastQueue<Type>& other);
00100
00101
00105 virtual
00106 ~BroadcastQueue();
00107
00108
00117 BroadcastQueue<Type>&
00118 operator=(const BroadcastQueue<Type>& other);
00119
00120
00134 void
00135 catchUp(ClientID& clientID);
00136
00137
00166 void
00167 copyBack(const ClientID& clientID, Type& target);
00168
00169
00193 bool
00194 copyBack(const ClientID& clientID, Type& target, double timeout);
00195
00196
00204 size_t
00205 getMaximumLength() {return m_size;}
00206
00207
00256 void
00257 lockBack(const ClientID& clientID);
00258
00259
00280 bool
00281 lockBack(const ClientID& clientID, double timeout);
00282
00283
00298 inline void
00299 popBack(ClientID& clientID);
00300
00301
00320 void
00321 pushFront(const ClientID& clientID, const Type& element);
00322
00323
00361 void
00362 registerClient(ClientID& clientID,
00363 const std::string& clientName="anonymous");
00364
00365
00393 void
00394 synchronize(ClientID& clientID);
00395
00396
00422 bool
00423 synchronize(ClientID& clientID, double timeout);
00424
00425
00471 void
00472 synchronizeQuorum(ClientID& clientID, size_t quorumSize);
00473
00474
00509 bool
00510 synchronizeQuorum(ClientID& clientID, size_t quorumSize, double timeout);
00511
00512
00522 void
00523 unlockBack(const ClientID& clientID);
00524
00525 private:
00526
00535 void
00536 copyOther(const BroadcastQueue<Type>& other);
00537
00538
00563 inline void
00564 declareReadLock(const ClientID& clientID,
00565 bool& overflowFlag,
00566 size_t& headElementNumber);
00567
00568
00583 void
00584 handleSynchronize(ClientID& clientID,
00585 size_t targetNumber,
00586 boost::timed_mutex::scoped_lock& syncLock);
00587
00588
00612 bool
00613 handleSynchronize(ClientID& clientID,
00614 size_t targetNumber,
00615 boost::xtime xtimeout,
00616 boost::timed_mutex::scoped_timed_lock& syncLock);
00617
00618
00623 void
00624 releaseResources();
00625
00626
00627 size_t* m_countPtr;
00628 size_t* m_headIndexPtr;
00629 size_t m_size;
00630
00631 std::map<size_t, std::string>* m_clientNameMapPtr;
00632 Type* m_dataArray;
00633 size_t* m_maxNumberOfClientsPtr;
00634 size_t* m_numberOfClientsPtr;
00635 size_t* m_readLockCountArray;
00636 size_t** m_readLockingFlagsArray;
00637 size_t* m_syncClientCountPtr;
00638 size_t* m_syncElementCountPtr;
00639 size_t* m_syncIndexPtr;
00640 size_t* m_quorumSizePtr;
00641
00642 boost::condition* m_headChangedConditionPtr;
00643 boost::timed_mutex* m_headIndexMutexPtr;
00644
00645
00646
00647 boost::timed_mutex* m_numberOfClientsMutexPtr;
00648
00649 boost::condition* m_syncConditionPtr;
00650 boost::timed_mutex* m_syncMutexPtr;
00651
00652
00653
00654 size_t* m_referenceCountPtr;
00655 };
00656
00657 }
00658
00659 }
00660
00661
00662
00663
00664 #include <algorithm>
00665 #include <iostream>
00666 #include <sstream>
00667 #include <dlrThread/private.h>
00668
00669
00670 namespace dlr {
00671
00672 namespace thread {
00673
00674
00675
00676 template <class Type>
00677 BroadcastQueue<Type>::
00678 BroadcastQueue(size_t maximumLength, size_t numberOfClients)
00679 : m_countPtr(new size_t),
00680 m_headIndexPtr(new size_t),
00681 m_size(maximumLength),
00682 m_clientNameMapPtr(new std::map<size_t, std::string>),
00683 m_dataArray(new Type[maximumLength]),
00684 m_maxNumberOfClientsPtr(new size_t),
00685 m_numberOfClientsPtr(new size_t),
00686 m_readLockCountArray(new size_t[maximumLength]),
00687 m_readLockingFlagsArray(new size_t*[maximumLength]),
00688 m_syncClientCountPtr(new size_t),
00689 m_syncElementCountPtr(new size_t),
00690 m_syncIndexPtr(new size_t),
00691 m_quorumSizePtr(new size_t),
00692 m_headChangedConditionPtr(new boost::condition),
00693 m_headIndexMutexPtr(new boost::timed_mutex),
00694
00695
00696
00697 m_numberOfClientsMutexPtr(new boost::timed_mutex),
00698 m_syncConditionPtr(new boost::condition),
00699 m_syncMutexPtr(new boost::timed_mutex),
00700 m_referenceCountPtr(new size_t)
00701 {
00702 if(numberOfClients == 0) {
00703 DLR_THROW(ValueException, "BroadcastQueue::BroadcastQueue()",
00704 "Argument numberOfClients must be greater than zero.");
00705 }
00706
00707
00708 *m_countPtr = 0;
00709 *m_headIndexPtr = 0;
00710
00711
00712 *m_maxNumberOfClientsPtr = numberOfClients;
00713
00714
00715 *m_numberOfClientsPtr = 0;
00716
00717
00718 for(size_t index0 = 0; index0 < m_size; ++index0) {
00719
00720 m_readLockingFlagsArray[index0] =
00721 new size_t[*m_maxNumberOfClientsPtr];
00722 size_t* beginIterator = m_readLockingFlagsArray[index0];
00723 size_t* endIterator = beginIterator + (*m_maxNumberOfClientsPtr);
00724 std::fill(beginIterator, endIterator, 0);
00725
00726
00727 m_readLockCountArray[index0] = 0;
00728 }
00729
00730
00731 *m_syncClientCountPtr = 0;
00732 *m_syncElementCountPtr = 0;
00733 *m_syncIndexPtr = 0;
00734 *m_quorumSizePtr = 0;
00735 *m_referenceCountPtr = 1;
00736 }
00737
00738
00739
00740 template <class Type>
00741 BroadcastQueue<Type>::
00742 BroadcastQueue(const BroadcastQueue<Type>& other)
00743 : m_countPtr(0),
00744 m_headIndexPtr(0),
00745 m_size(0),
00746 m_clientNameMapPtr(0),
00747 m_dataArray(0),
00748 m_maxNumberOfClientsPtr(0),
00749 m_numberOfClientsPtr(0),
00750 m_readLockCountArray(0),
00751 m_readLockingFlagsArray(0),
00752 m_syncClientCountPtr(0),
00753 m_syncElementCountPtr(0),
00754 m_syncIndexPtr(0),
00755 m_quorumSizePtr(0),
00756 m_headChangedConditionPtr(0),
00757 m_headIndexMutexPtr(0),
00758 m_numberOfClientsMutexPtr(0),
00759 m_syncConditionPtr(0),
00760 m_syncMutexPtr(0),
00761 m_referenceCountPtr(0)
00762 {
00763 this->copyOther(other);
00764 }
00765
00766
00767
00768 template <class Type>
00769 BroadcastQueue<Type>::
00770 ~BroadcastQueue()
00771 {
00772 this->releaseResources();
00773 }
00774
00775
00776
00777 template <class Type>
00778 BroadcastQueue<Type>&
00779 BroadcastQueue<Type>::
00780 operator=(const BroadcastQueue<Type>& other)
00781 {
00782 if(&other != this) {
00783 this->releaseResources();
00784 this->copyOther(other);
00785 }
00786 }
00787
00788
00789
00790
00791 template <class Type>
00792 void
00793 BroadcastQueue<Type>::
00794 catchUp(ClientID& clientID)
00795 {
00796 boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
00797 clientID.m_count = *m_countPtr;
00798 clientID.m_index = *m_headIndexPtr;
00799 }
00800
00801
00802
00803
00804 template <class Type>
00805 void
00806 BroadcastQueue<Type>::
00807 copyBack(const ClientID& clientID, Type& target)
00808 {
00809 this->lockBack(clientID);
00810
00811 try {
00812 target = m_dataArray[clientID.m_index];
00813 } catch(...) {
00814 this->unlockBack(clientID);
00815 throw;
00816 }
00817
00818 this->unlockBack(clientID);
00819 }
00820
00821
00822
00823
00824 template <class Type>
00825 bool
00826 BroadcastQueue<Type>::
00827 copyBack(const ClientID& clientID, Type& target, double timeout)
00828 {
00829 if(this->lockBack(clientID, timeout) == false) {
00830 return false;
00831 }
00832
00833 try {
00834 target = m_dataArray[clientID.m_index];
00835 } catch(...) {
00836 this->unlockBack(clientID);
00837 throw;
00838 }
00839
00840 this->unlockBack(clientID);
00841 return true;
00842 }
00843
00844
00845
00846
00847 template <class Type>
00848 void
00849 BroadcastQueue<Type>::
00850 lockBack(const ClientID& clientID)
00851 {
00852
00853
00854
00855 bool overflowFlag = false;
00856 size_t headElementNumber = 0;
00857
00858
00859
00860
00861
00862 {
00863 boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873 while(clientID.m_count >= (*m_countPtr)) {
00874 m_headChangedConditionPtr->wait(headLock);
00875 }
00876
00877
00878
00879 this->declareReadLock(clientID, overflowFlag, headElementNumber);
00880
00881 }
00882
00883 if(overflowFlag) {
00884 std::ostringstream message;
00885 message << "Thread "
00886 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
00887 << " "
00888 << "has fallen irretrievably far behind the front of the "
00889 << "queue. Accessed element is number "
00890 << clientID.m_count << ", head element is number "
00891 << headElementNumber << ".";
00892 DLR_THROW(OverflowException, "BroadcastQueue::lockBack()",
00893 message.str().c_str());
00894 }
00895 }
00896
00897
00898
00899
00900 template <class Type>
00901 bool
00902 BroadcastQueue<Type>::
00903 lockBack(const ClientID& clientID, double timeout)
00904 {
00905
00906 if(timeout < 0.0) {
00907 timeout = 0.0;
00908 }
00909 boost::xtime xtimeout;
00910 if(getXTime(timeout, xtimeout) == false) {
00911
00912 return false;
00913 }
00914
00915
00916
00917
00918 bool overflowFlag = false;
00919 size_t headElementNumber = 0;
00920
00921
00922
00923
00924
00925 try {
00926
00927 boost::timed_mutex::scoped_timed_lock headLock(
00928 *m_headIndexMutexPtr, xtimeout);
00929
00930
00931
00932
00933 while(clientID.m_count >= (*m_countPtr)) {
00934 if(m_headChangedConditionPtr->timed_wait(headLock, xtimeout)
00935 == false) {
00936
00937 return false;
00938 }
00939 }
00940
00941
00942
00943 this->declareReadLock(clientID, overflowFlag, headElementNumber);
00944
00945 } catch(const boost::lock_error&) {
00946
00947 return false;
00948 }
00949
00950 if(overflowFlag) {
00951 std::ostringstream message;
00952 message << "Thread "
00953 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
00954 << " "
00955 << "has fallen irretrievably far behind the front of the "
00956 << "queue. Accessed element is number "
00957 << clientID.m_count << ", head element is number "
00958 << headElementNumber << ".";
00959 DLR_THROW(OverflowException, "BroadcastQueue::lockBack()",
00960 message.str().c_str());
00961 }
00962 return true;
00963 }
00964
00965
00966 template <class Type>
00967 inline void
00968 BroadcastQueue<Type>::
00969 popBack(ClientID& clientID)
00970 {
00971 ++clientID.m_count;
00972 ++clientID.m_index;
00973 if(clientID.m_index >= m_size) {
00974 clientID.m_index = 0;
00975 }
00976 if(clientID.m_count == std::numeric_limits<size_t>::max()) {
00977 std::ostringstream message;
00978 message << "Thread "
00979 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
00980 << " "
00981 << "has exceeded the maximum allowable queue position of "
00982 << std::numeric_limits<size_t>::max() - 1 << ".";
00983 DLR_THROW(LogicException, "BroadcastQueue::popBack()",
00984 message.str().c_str());
00985 }
00986 }
00987
00988
00989 template <class Type>
00990 void
00991 BroadcastQueue<Type>::
00992 pushFront(const ClientID& clientID, const Type& element)
00993 {
00994
00995
00996
00997 bool readLockedFlag = false;
00998 bool countOverflowFlag = false;
00999 size_t threadIDNumber = 0;
01000
01001
01002
01003
01004
01005 {
01006
01007
01008 boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01009
01010 if(m_readLockCountArray[*m_headIndexPtr] != 0) {
01011
01012
01013 readLockedFlag = true;
01014 size_t* flagsPtr = m_readLockingFlagsArray[*m_headIndexPtr];
01015 while(threadIDNumber < (*m_numberOfClientsPtr)) {
01016 if(flagsPtr[threadIDNumber] != 0) {
01017 break;
01018 }
01019 ++threadIDNumber;
01020 }
01021 } else {
01022
01023 m_dataArray[*m_headIndexPtr] = element;
01024 ++(*m_headIndexPtr);
01025 ++(*m_countPtr);
01026 if((*m_headIndexPtr) >= m_size) {
01027 (*m_headIndexPtr) = 0;
01028 }
01029 if(*m_countPtr == std::numeric_limits<size_t>::max()) {
01030 countOverflowFlag = true;
01031 }
01032
01033
01034 m_headChangedConditionPtr->notify_all();
01035 }
01036 }
01037
01038
01039 if(readLockedFlag) {
01040
01041 if(threadIDNumber == *m_numberOfClientsPtr) {
01042 DLR_THROW(LogicException, "BroadcastQueue::pushFront()",
01043 "Read lock accounting is in an inconsistent state.");
01044 }
01045 std::ostringstream message;
01046 message << "Thread "
01047 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
01048 << " "
01049 << "can't advance the queue because thread "
01050 << (m_clientNameMapPtr->find(threadIDNumber))->second
01051 << " "
01052 << "is still read-locking the tail element.";
01053 DLR_THROW(OverflowException, "BroadcastQueue::pushFront()",
01054 message.str().c_str());
01055 }
01056 if(countOverflowFlag) {
01057 std::ostringstream message;
01058 message << "Queue size exceeds the allowable maximum of "
01059 << std::numeric_limits<size_t>::max() - 1 << " "
01060 << "on a function call from thread "
01061 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
01062 << ".";
01063 DLR_THROW(OverflowException, "BroadcastQueue::pushFront()",
01064 message.str().c_str());
01065 }
01066 }
01067
01068
01069
01070
01071
01072 template <class Type>
01073 void
01074 BroadcastQueue<Type>::
01075 registerClient(ClientID& clientID, const std::string& clientName)
01076 {
01077
01078
01079
01080
01081 boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01082
01083
01084
01085
01086 if(*m_numberOfClientsPtr >= (*m_maxNumberOfClientsPtr)) {
01087 size_t newMaxNumberOfClients = (*m_maxNumberOfClientsPtr) << 1;
01088 for(size_t index0 = 0; index0 < m_size; ++index0) {
01089 {
01090
01091
01092
01093
01094
01095 size_t* newFlagsArray = new size_t[newMaxNumberOfClients];
01096 size_t* beginIterator = m_readLockingFlagsArray[index0];
01097 size_t* endIterator = beginIterator + (*m_maxNumberOfClientsPtr);
01098 std::copy(beginIterator, endIterator, newFlagsArray);
01099
01100 beginIterator = newFlagsArray;
01101 endIterator = beginIterator + newMaxNumberOfClients;
01102 beginIterator += (*m_maxNumberOfClientsPtr);
01103 std::fill(beginIterator, endIterator, 0);
01104
01105 delete[] m_readLockingFlagsArray[index0];
01106 m_readLockingFlagsArray[index0] = newFlagsArray;
01107 }
01108 }
01109 *m_maxNumberOfClientsPtr = newMaxNumberOfClients;
01110 }
01111 m_clientNameMapPtr->insert(
01112 std::make_pair(*m_numberOfClientsPtr, clientName));
01113 clientID.m_count = 0;
01114 clientID.m_idNumber = (*m_numberOfClientsPtr)++;
01115 clientID.m_index = 0;
01116 }
01117
01118
01119
01120
01121
01122 template <class Type>
01123 void
01124 BroadcastQueue<Type>::
01125 synchronize(ClientID& clientID)
01126 {
01127 boost::timed_mutex::scoped_lock syncLock(*m_syncMutexPtr);
01128
01129 if((*m_syncClientCountPtr) != 0) {
01130 if((*m_quorumSizePtr) != 0) {
01131 std::ostringstream message;
01132 message << "Can't synchronize with all registered clients "
01133 << "while another thread is waiting for a quorum of size "
01134 << (*m_quorumSizePtr) << ".";
01135 DLR_THROW(StateException, "BroadcastQueue::synchronize()",
01136 message.str().c_str());
01137 }
01138 }
01139 (*m_quorumSizePtr) = 0;
01140
01141 this->handleSynchronize(clientID, (*m_numberOfClientsPtr), syncLock);
01142 }
01143
01144
01145
01146
01147
01148 template <class Type>
01149 bool
01150 BroadcastQueue<Type>::
01151 synchronize(ClientID& clientID, double timeout)
01152 {
01153
01154 if(timeout < 0.0) {
01155 timeout = 0.0;
01156 }
01157 boost::xtime xtimeout;
01158 if(getXTime(timeout, xtimeout) == false) {
01159
01160 return false;
01161 }
01162
01163 bool returnValue = false;
01164 try {
01165
01166 boost::timed_mutex::scoped_timed_lock syncLock(
01167 *m_syncMutexPtr, xtimeout);
01168
01169
01170 if((*m_syncClientCountPtr) != 0) {
01171 if((*m_quorumSizePtr) != 0) {
01172 std::ostringstream message;
01173 message << "Can't synchronize with all registered clients "
01174 << "while another thread is waiting for a quorum of size "
01175 << (*m_quorumSizePtr) << ".";
01176 DLR_THROW(StateException, "BroadcastQueue::synchronize()",
01177 message.str().c_str());
01178 }
01179 }
01180 (*m_quorumSizePtr) = 0;
01181
01182 returnValue =
01183 this->handleSynchronize(
01184 clientID, (*m_numberOfClientsPtr), xtimeout, syncLock);
01185 } catch(const boost::lock_error&) {
01186
01187 }
01188 return returnValue;
01189 }
01190
01191
01192
01193
01194
01195 template <class Type>
01196 void
01197 BroadcastQueue<Type>::
01198 synchronizeQuorum(ClientID& clientID, size_t quorumSize)
01199 {
01200 boost::timed_mutex::scoped_lock syncLock(*m_syncMutexPtr);
01201
01202 if(quorumSize == 0) {
01203 quorumSize = (*m_numberOfClientsPtr);
01204 }
01205
01206 if((*m_syncClientCountPtr) != 0) {
01207 if((*m_quorumSizePtr) == 0) {
01208 DLR_THROW(StateException, "BroadcastQueue::synchronizeQuorum()",
01209 "Can't start a quorum while another thread is blocking "
01210 "in member function synchronize()");
01211 } else if((*m_quorumSizePtr) != quorumSize) {
01212 std::ostringstream message;
01213 message << "Can't start a quorum of size " << quorumSize << " "
01214 << "while another thread is waiting for a quorum of size "
01215 << (*m_quorumSizePtr) << ".";
01216 DLR_THROW(StateException, "BroadcastQueue::synchronizeQuorum()",
01217 message.str().c_str());
01218 }
01219 }
01220 (*m_quorumSizePtr) = quorumSize;
01221
01222 this->handleSynchronize(clientID, (*m_quorumSizePtr), syncLock);
01223 }
01224
01225
01226
01227
01228
01229 template <class Type>
01230 bool
01231 BroadcastQueue<Type>::
01232 synchronizeQuorum(ClientID& clientID, size_t quorumSize, double timeout)
01233 {
01234
01235 if(timeout < 0.0) {
01236 timeout = 0.0;
01237 }
01238 boost::xtime xtimeout;
01239 if(getXTime(timeout, xtimeout) == false) {
01240
01241 return false;
01242 }
01243
01244 bool returnValue = false;
01245 try {
01246
01247 boost::timed_mutex::scoped_timed_lock syncLock(
01248 *m_syncMutexPtr, xtimeout);
01249
01250
01251 if(quorumSize == 0) {
01252 quorumSize = (*m_numberOfClientsPtr);
01253 }
01254
01255 if((*m_syncClientCountPtr) != 0) {
01256 if((*m_quorumSizePtr) == 0) {
01257 DLR_THROW(StateException, "BroadcastQueue::synchronizeQuorum()",
01258 "Can't start a quorum while another thread is blocking "
01259 "in member function synchronize()");
01260 } else if((*m_quorumSizePtr) != quorumSize) {
01261 std::ostringstream message;
01262 message << "Can't start a quorum of size " << quorumSize << " "
01263 << "while another thread is waiting for a quorum of size "
01264 << (*m_quorumSizePtr) << ".";
01265 DLR_THROW(StateException, "BroadcastQueue::synchronizeQuorum()",
01266 message.str().c_str());
01267 }
01268 }
01269 (*m_quorumSizePtr) = quorumSize;
01270
01271 returnValue =
01272 this->handleSynchronize(
01273 clientID, (*m_quorumSizePtr), xtimeout, syncLock);
01274 } catch(const boost::lock_error&) {
01275
01276 }
01277 return returnValue;
01278 }
01279
01280
01281
01282
01283 template <class Type>
01284 void
01285 BroadcastQueue<Type>::
01286 unlockBack(const ClientID& clientID)
01287 {
01288
01289
01290
01291 {
01292 boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01293
01294
01295
01296
01297 size_t& lockedFlag =
01298 m_readLockingFlagsArray[clientID.m_index][clientID.m_idNumber];
01299
01300
01301
01302 if(lockedFlag != 0) {
01303 --lockedFlag;
01304 --(m_readLockCountArray[clientID.m_index]);
01305 }
01306 }
01307 }
01308
01309
01310
01311
01312
01313
01314
01315 template <class Type>
01316 void
01317 BroadcastQueue<Type>::
01318 copyOther(const BroadcastQueue<Type>& other)
01319 {
01320 {
01321 boost::timed_mutex::scoped_lock
01322 headLock(*(other.m_headIndexMutexPtr));
01323
01324 m_countPtr = other.m_countPtr;
01325 m_headIndexPtr = other.m_headIndexPtr;
01326 m_size = other.m_size;
01327 m_clientNameMapPtr = other.m_clientNameMapPtr;
01328 m_dataArray = other.m_dataArray;
01329 m_maxNumberOfClientsPtr = other.m_maxNumberOfClientsPtr;
01330 m_numberOfClientsPtr = other.m_numberOfClientsPtr;
01331 m_readLockCountArray = other.m_readLockCountArray;
01332 m_readLockingFlagsArray = other.m_readLockingFlagsArray;
01333 m_syncClientCountPtr = other.m_syncClientCountPtr;
01334 m_syncElementCountPtr = other.m_syncElementCountPtr;
01335 m_syncIndexPtr = other.m_syncIndexPtr;
01336 m_quorumSizePtr = other.m_quorumSizePtr;
01337 m_headIndexMutexPtr = other.m_headIndexMutexPtr;
01338 m_headChangedConditionPtr = other.m_headChangedConditionPtr;
01339
01340
01341
01342 m_numberOfClientsMutexPtr = other.m_numberOfClientsMutexPtr;
01343 m_syncConditionPtr = other.m_syncConditionPtr;
01344 m_syncMutexPtr = other.m_syncMutexPtr;
01345 m_referenceCountPtr = other.m_referenceCountPtr;
01346 ++(*m_referenceCountPtr);
01347 }
01348 }
01349
01350
01351
01352
01353 template <class Type>
01354 inline void
01355 BroadcastQueue<Type>::
01356 declareReadLock(const ClientID& clientID,
01357 bool& overflowFlag,
01358 size_t& headElementNumber)
01359 {
01360
01361
01362 if((clientID.m_count + m_size) < (*m_countPtr)) {
01363 overflowFlag = true;
01364 headElementNumber = *m_countPtr;
01365 } else {
01366
01367 ++(m_readLockingFlagsArray[clientID.m_index][clientID.m_idNumber]);
01368 ++(m_readLockCountArray[clientID.m_index]);
01369 }
01370 }
01371
01372
01373
01374
01375 template <class Type>
01376 void
01377 BroadcastQueue<Type>::
01378 handleSynchronize(ClientID& clientID, size_t targetNumber,
01379 boost::timed_mutex::scoped_lock& syncLock)
01380 {
01381
01382
01383 ++(*m_syncClientCountPtr);
01384 if((*m_syncClientCountPtr) >= targetNumber) {
01385 {
01386
01387
01388 boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01389 *m_syncElementCountPtr = *m_countPtr;
01390 *m_syncIndexPtr = *m_headIndexPtr;
01391 }
01392
01393 (*m_syncClientCountPtr) = 0;
01394 m_syncConditionPtr->notify_all();
01395 } else {
01396 m_syncConditionPtr->wait(syncLock);
01397 }
01398 clientID.m_count = *m_syncElementCountPtr;
01399 clientID.m_index = *m_syncIndexPtr;
01400 }
01401
01402
01403
01404
01405 template <class Type>
01406 bool
01407 BroadcastQueue<Type>::
01408 handleSynchronize(ClientID& clientID,
01409 size_t targetNumber,
01410 boost::xtime xtimeout,
01411 boost::timed_mutex::scoped_timed_lock& syncLock)
01412 {
01413
01414
01415 ++(*m_syncClientCountPtr);
01416 if((*m_syncClientCountPtr) >= targetNumber) {
01417 try {
01418
01419
01420 boost::timed_mutex::scoped_timed_lock
01421 headLock(*m_headIndexMutexPtr, xtimeout);
01422 *m_syncElementCountPtr = *m_countPtr;
01423 *m_syncIndexPtr = *m_headIndexPtr;
01424 } catch(const boost::lock_error&) {
01425 --(*m_syncClientCountPtr);
01426 return false;
01427 }
01428
01429 (*m_syncClientCountPtr) = 0;
01430 m_syncConditionPtr->notify_all();
01431 } else {
01432 if(m_syncConditionPtr->timed_wait(syncLock, xtimeout) == false) {
01433
01434 --(*m_syncClientCountPtr);
01435 return false;
01436 }
01437 }
01438 clientID.m_count = *m_syncElementCountPtr;
01439 clientID.m_index = *m_syncIndexPtr;
01440 return true;
01441 }
01442
01443
01444
01445
01446 template <class Type>
01447 void
01448 BroadcastQueue<Type>::
01449 releaseResources()
01450 {
01451 bool isShared = true;
01452 {
01453 boost::timed_mutex::scoped_lock headLock(*m_headIndexMutexPtr);
01454 if(--(*m_referenceCountPtr) == 0) {
01455 isShared = false;
01456 }
01457 }
01458
01459 if(!isShared) {
01460 delete m_countPtr;
01461 delete m_headIndexPtr;
01462 delete m_clientNameMapPtr;
01463 delete[] m_dataArray;
01464 delete m_maxNumberOfClientsPtr;
01465 delete m_numberOfClientsPtr;
01466 delete[] m_readLockCountArray;
01467 for(size_t index0 = 0; index0 < m_size; ++index0) {
01468 delete[] m_readLockingFlagsArray[index0];
01469 }
01470 delete[] m_readLockingFlagsArray;
01471 delete m_syncClientCountPtr;
01472 delete m_syncElementCountPtr;
01473 delete m_syncIndexPtr;
01474 delete m_quorumSizePtr;
01475 delete m_headChangedConditionPtr;
01476 delete m_headIndexMutexPtr;
01477
01478
01479
01480 delete m_numberOfClientsMutexPtr;
01481 delete m_syncConditionPtr;
01482 delete m_syncMutexPtr;
01483 delete m_referenceCountPtr;
01484 }
01485 }
01486
01487 }
01488
01489 }
01490
01491
01492
01493 #endif